Remove Accounts Postgres

This commit is contained in:
Chris Eager
2021-09-20 11:10:24 -07:00
committed by GitHub
parent 8161f55a82
commit 2a67b2e610
29 changed files with 253 additions and 2906 deletions

View File

@@ -46,9 +46,7 @@ import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.servlet.DispatcherType;
import javax.servlet.FilterRegistration;
@@ -62,13 +60,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.dispatch.DispatchManager;
import org.whispersystems.textsecuregcm.auth.AccountAuthenticator;
import org.whispersystems.textsecuregcm.auth.WebsocketRefreshApplicationEventListener;
import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount;
import org.whispersystems.textsecuregcm.auth.CertificateGenerator;
import org.whispersystems.textsecuregcm.auth.DisabledPermittedAccountAuthenticator;
import org.whispersystems.textsecuregcm.auth.DisabledPermittedAuthenticatedAccount;
import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentialGenerator;
import org.whispersystems.textsecuregcm.auth.TurnTokenGenerator;
import org.whispersystems.textsecuregcm.auth.WebsocketRefreshApplicationEventListener;
import org.whispersystems.textsecuregcm.badges.ConfiguredProfileBadgeConverter;
import org.whispersystems.textsecuregcm.badges.ProfileBadgeConverter;
import org.whispersystems.textsecuregcm.configuration.DirectoryServerConfiguration;
@@ -156,9 +154,7 @@ import org.whispersystems.textsecuregcm.storage.AccountCleaner;
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawler;
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerCache;
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerListener;
import org.whispersystems.textsecuregcm.storage.Accounts;
import org.whispersystems.textsecuregcm.storage.AccountsDynamoDb;
import org.whispersystems.textsecuregcm.storage.AccountsDynamoDbMigrator;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.ContactDiscoveryWriter;
import org.whispersystems.textsecuregcm.storage.DeletedAccounts;
@@ -174,11 +170,6 @@ import org.whispersystems.textsecuregcm.storage.MessagePersister;
import org.whispersystems.textsecuregcm.storage.MessagesCache;
import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.MigrationDeletedAccounts;
import org.whispersystems.textsecuregcm.storage.MigrationMismatchedAccounts;
import org.whispersystems.textsecuregcm.storage.MigrationMismatchedAccountsTableCrawler;
import org.whispersystems.textsecuregcm.storage.MigrationRetryAccounts;
import org.whispersystems.textsecuregcm.storage.MigrationRetryAccountsTableCrawler;
import org.whispersystems.textsecuregcm.storage.Profiles;
import org.whispersystems.textsecuregcm.storage.ProfilesManager;
import org.whispersystems.textsecuregcm.storage.PubSubManager;
@@ -211,14 +202,12 @@ import org.whispersystems.textsecuregcm.workers.ServerVersionCommand;
import org.whispersystems.textsecuregcm.workers.SetCrawlerAccelerationTask;
import org.whispersystems.textsecuregcm.workers.SetRequestLoggingEnabledTask;
import org.whispersystems.textsecuregcm.workers.SetUserDiscoverabilityCommand;
import org.whispersystems.textsecuregcm.workers.VacuumCommand;
import org.whispersystems.textsecuregcm.workers.ZkParamsCommand;
import org.whispersystems.websocket.WebSocketResourceProviderFactory;
import org.whispersystems.websocket.setup.WebSocketEnvironment;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.s3.S3Client;
@@ -228,7 +217,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
@Override
public void initialize(Bootstrap<WhisperServerConfiguration> bootstrap) {
bootstrap.addCommand(new VacuumCommand());
bootstrap.addCommand(new DeleteUserCommand());
bootstrap.addCommand(new CertificateCommand());
bootstrap.addCommand(new ZkParamsCommand());
@@ -243,7 +231,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
}
});
bootstrap.addBundle(new NameableMigrationsBundle<WhisperServerConfiguration>("abusedb", "abusedb.xml") {
@Override
public PooledDataSourceFactory getDataSourceFactory(WhisperServerConfiguration configuration) {
@@ -316,20 +303,9 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
DynamoDbClient accountsDynamoDbClient = DynamoDbFromConfig.client(config.getAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
// The thread pool core & max sizes are set via dynamic configuration within AccountsDynamoDb
ThreadPoolExecutor accountsDynamoDbMigrationThreadPool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
new LinkedBlockingDeque<>());
DynamoDbAsyncClient accountsDynamoDbAsyncClient = DynamoDbFromConfig.asyncClient(config.getAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create(),
accountsDynamoDbMigrationThreadPool);
DynamoDbClient deletedAccountsDynamoDbClient = DynamoDbFromConfig.client(config.getDeletedAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbClient recentlyDeletedAccountsDynamoDb = DynamoDbFromConfig.client(config.getMigrationDeletedAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbClient pushChallengeDynamoDbClient = DynamoDbFromConfig.client(
config.getPushChallengeDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
@@ -338,14 +314,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
config.getReportMessageDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbClient migrationRetryAccountsDynamoDb = DynamoDbFromConfig.client(
config.getMigrationRetryAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbClient migrationMismatchedAccountsDynamoDb = DynamoDbFromConfig.client(
config.getMigrationMismatchedAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbClient pendingAccountsDynamoDbClient = DynamoDbFromConfig.client(
config.getPendingAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
@@ -366,19 +334,11 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
DeletedAccounts deletedAccounts = new DeletedAccounts(deletedAccountsDynamoDbClient,
config.getDeletedAccountsDynamoDbConfiguration().getTableName(),
config.getDeletedAccountsDynamoDbConfiguration().getNeedsReconciliationIndexName());
MigrationDeletedAccounts migrationDeletedAccounts = new MigrationDeletedAccounts(recentlyDeletedAccountsDynamoDb,
config.getMigrationDeletedAccountsDynamoDbConfiguration().getTableName());
MigrationRetryAccounts migrationRetryAccounts = new MigrationRetryAccounts(migrationRetryAccountsDynamoDb,
config.getMigrationRetryAccountsDynamoDbConfiguration().getTableName());
MigrationMismatchedAccounts mismatchedAccounts = new MigrationMismatchedAccounts(
migrationMismatchedAccountsDynamoDb,
config.getMigrationMismatchedAccountsDynamoDbConfiguration().getTableName());
Accounts accounts = new Accounts(accountDatabase);
AccountsDynamoDb accountsDynamoDb = new AccountsDynamoDb(accountsDynamoDbClient, accountsDynamoDbAsyncClient,
accountsDynamoDbMigrationThreadPool, config.getAccountsDynamoDbConfiguration().getTableName(),
config.getAccountsDynamoDbConfiguration().getPhoneNumberTableName(), migrationDeletedAccounts,
migrationRetryAccounts);
AccountsDynamoDb accountsDynamoDb = new AccountsDynamoDb(accountsDynamoDbClient,
config.getAccountsDynamoDbConfiguration().getTableName(),
config.getAccountsDynamoDbConfiguration().getPhoneNumberTableName()
);
Usernames usernames = new Usernames(accountDatabase);
ReservedUsernames reservedUsernames = new ReservedUsernames(accountDatabase);
Profiles profiles = new Profiles(accountDatabase);
@@ -431,8 +391,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
ExecutorService donationExecutor = environment.lifecycle().executorService(name(getClass(), "donation-%d")).maxThreads(1).minThreads(1).build();
ExecutorService multiRecipientMessageExecutor = environment.lifecycle()
.executorService(name(getClass(), "multiRecipientMessage-%d")).minThreads(64).maxThreads(64).build();
ExecutorService accountsCrawlerChunkPreReadExecutor = environment.lifecycle()
.executorService(name(getClass(), "accountsCrawler-%d")).maxThreads(2).minThreads(2).build();
ExternalServiceCredentialGenerator directoryCredentialsGenerator = new ExternalServiceCredentialGenerator(config.getDirectoryConfiguration().getDirectoryClientConfiguration().getUserAuthenticationTokenSharedSecret(),
config.getDirectoryConfiguration().getDirectoryClientConfiguration().getUserAuthenticationTokenUserIdSecret(),
@@ -464,9 +422,9 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, pushLatencyManager, reportMessageManager);
DeletedAccountsManager deletedAccountsManager = new DeletedAccountsManager(deletedAccounts,
deletedAccountsLockDynamoDbClient, config.getDeletedAccountsLockDynamoDbConfiguration().getTableName());
AccountsManager accountsManager = new AccountsManager(accounts, accountsDynamoDb, cacheCluster,
deletedAccountsManager, directoryQueue, keysDynamoDb, messagesManager, mismatchedAccounts, usernamesManager,
profilesManager, pendingAccountsManager, secureStorageClient, secureBackupClient, experimentEnrollmentManager,
AccountsManager accountsManager = new AccountsManager(accountsDynamoDb, cacheCluster,
deletedAccountsManager, directoryQueue, keysDynamoDb, messagesManager, usernamesManager,
profilesManager, pendingAccountsManager, secureStorageClient, secureBackupClient,
dynamicConfigurationManager);
RemoteConfigsManager remoteConfigsManager = new RemoteConfigsManager(remoteConfigs);
DeadLetterHandler deadLetterHandler = new DeadLetterHandler(accountsManager, messagesManager);
@@ -539,27 +497,10 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
AccountDatabaseCrawler accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager,
accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners,
config.getAccountDatabaseCrawlerConfiguration().getChunkSize(),
config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs(),
accountsCrawlerChunkPreReadExecutor,
dynamicConfigurationManager);
AccountDatabaseCrawlerCache dynamoDbMigrationCrawlerCache = new AccountDatabaseCrawlerCache(cacheCluster);
dynamoDbMigrationCrawlerCache.setPrefix("DynamoMigration");
AccountDatabaseCrawler accountDynamoDbMigrationCrawler = new AccountDatabaseCrawler(accountsManager,
dynamoDbMigrationCrawlerCache,
List.of(new AccountsDynamoDbMigrator(accountsDynamoDb, dynamicConfigurationManager)),
config.getDynamoDbMigrationCrawlerConfiguration().getChunkSize(),
config.getDynamoDbMigrationCrawlerConfiguration().getChunkIntervalMs(),
accountsCrawlerChunkPreReadExecutor,
dynamicConfigurationManager);
accountDynamoDbMigrationCrawler.setDedicatedDynamoMigrationCrawler(true);
config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs()
);
DeletedAccountsTableCrawler deletedAccountsTableCrawler = new DeletedAccountsTableCrawler(deletedAccountsManager, deletedAccountsDirectoryReconcilers, cacheCluster, recurringJobExecutor);
MigrationRetryAccountsTableCrawler migrationRetryAccountsTableCrawler = new MigrationRetryAccountsTableCrawler(
migrationRetryAccounts, accountsManager, accountsDynamoDb, cacheCluster, recurringJobExecutor);
MigrationMismatchedAccountsTableCrawler migrationMismatchedAccountsTableCrawler = new MigrationMismatchedAccountsTableCrawler(
mismatchedAccounts, accountsManager, accounts, accountsDynamoDb, dynamicConfigurationManager, cacheCluster,
recurringJobExecutor);
apnSender.setApnFallbackManager(apnFallbackManager);
environment.lifecycle().manage(new ApplicationShutdownMonitor());
@@ -567,10 +508,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
environment.lifecycle().manage(pubSubManager);
environment.lifecycle().manage(messageSender);
environment.lifecycle().manage(accountDatabaseCrawler);
environment.lifecycle().manage(accountDynamoDbMigrationCrawler);
environment.lifecycle().manage(deletedAccountsTableCrawler);
environment.lifecycle().manage(migrationRetryAccountsTableCrawler);
environment.lifecycle().manage(migrationMismatchedAccountsTableCrawler);
environment.lifecycle().manage(remoteConfigsManager);
environment.lifecycle().manage(messagesCache);
environment.lifecycle().manage(messagePersister);

View File

@@ -1,112 +1,15 @@
package org.whispersystems.textsecuregcm.configuration.dynamic;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
public class DynamicAccountsDynamoDbMigrationConfiguration {
@JsonProperty
boolean dynamoPrimary;
@JsonProperty
boolean backgroundMigrationEnabled;
@JsonProperty
int backgroundMigrationExecutorThreads = 1;
@JsonProperty
boolean deleteEnabled;
@JsonProperty
boolean writeEnabled;
@JsonProperty
boolean readEnabled;
@JsonProperty
boolean postCheckMismatches;
@JsonProperty
boolean logMismatches;
@JsonProperty
boolean crawlerPreReadNextChunkEnabled;
@JsonProperty
boolean dynamoCrawlerEnabled;
@JsonProperty
int dynamoCrawlerScanPageSize = 10;
public boolean isDynamoPrimary() {
return dynamoPrimary;
}
public boolean isBackgroundMigrationEnabled() {
return backgroundMigrationEnabled;
}
public int getBackgroundMigrationExecutorThreads() {
return backgroundMigrationExecutorThreads;
}
@VisibleForTesting
public void setBackgroundMigrationEnabled(boolean backgroundMigrationEnabled) {
this.backgroundMigrationEnabled = backgroundMigrationEnabled;
}
public void setDeleteEnabled(boolean deleteEnabled) {
this.deleteEnabled = deleteEnabled;
}
public boolean isDeleteEnabled() {
return deleteEnabled;
}
public void setWriteEnabled(boolean writeEnabled) {
this.writeEnabled = writeEnabled;
}
public boolean isWriteEnabled() {
return writeEnabled;
}
@VisibleForTesting
public void setReadEnabled(boolean readEnabled) {
this.readEnabled = readEnabled;
}
public boolean isReadEnabled() {
return readEnabled;
}
public boolean isPostCheckMismatches() {
return postCheckMismatches;
}
public boolean isLogMismatches() {
return logMismatches;
}
public boolean isCrawlerPreReadNextChunkEnabled() {
return crawlerPreReadNextChunkEnabled;
}
public boolean isDynamoCrawlerEnabled() {
return dynamoCrawlerEnabled;
}
// TODO move out of "migration" configuration
public int getDynamoCrawlerScanPageSize() {
return dynamoCrawlerScanPageSize;
}
@VisibleForTesting
public void setLogMismatches(boolean logMismatches) {
this.logMismatches = logMismatches;
}
@VisibleForTesting
public void setBackgroundMigrationExecutorThreads(int threads) {
this.backgroundMigrationExecutorThreads = threads;
}
}

View File

@@ -14,7 +14,6 @@ import io.dropwizard.lifecycle.Managed;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,32 +40,22 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
private final String workerId;
private final AccountDatabaseCrawlerCache cache;
private final List<AccountDatabaseCrawlerListener> listeners;
private final ExecutorService chunkPreReadExecutorService;
private final DynamicConfigurationManager dynamicConfigurationManager;
private AtomicBoolean running = new AtomicBoolean(false);
private boolean finished;
// temporary to control behavior during the Postgres → Dynamo transition
private boolean dedicatedDynamoMigrationCrawler;
public AccountDatabaseCrawler(AccountsManager accounts,
AccountDatabaseCrawlerCache cache,
List<AccountDatabaseCrawlerListener> listeners,
int chunkSize,
long chunkIntervalMs,
ExecutorService chunkPreReadExecutorService,
DynamicConfigurationManager dynamicConfigurationManager) {
long chunkIntervalMs) {
this.accounts = accounts;
this.chunkSize = chunkSize;
this.chunkIntervalMs = chunkIntervalMs;
this.workerId = UUID.randomUUID().toString();
this.cache = cache;
this.listeners = listeners;
this.chunkPreReadExecutorService = chunkPreReadExecutorService;
this.dynamicConfigurationManager = dynamicConfigurationManager;
}
@Override
@@ -131,25 +120,19 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
try (Timer.Context timer = processChunkTimer.time()) {
final boolean useDynamo = !dedicatedDynamoMigrationCrawler && dynamicConfigurationManager.getConfiguration()
.getAccountsDynamoDbMigrationConfiguration()
.isDynamoCrawlerEnabled();
final Optional<UUID> fromUuid = getLastUuid(useDynamo);
final Optional<UUID> fromUuid = getLastUuid();
if (fromUuid.isEmpty()) {
logger.info("Started crawl");
listeners.forEach(AccountDatabaseCrawlerListener::onCrawlStart);
}
final AccountCrawlChunk chunkAccounts = readChunk(fromUuid, chunkSize, useDynamo);
primeDatabaseForNextChunkAsync(chunkAccounts.getLastUuid(), chunkSize, useDynamo);
final AccountCrawlChunk chunkAccounts = readChunk(fromUuid, chunkSize);
if (chunkAccounts.getAccounts().isEmpty()) {
logger.info("Finished crawl");
listeners.forEach(listener -> listener.onCrawlEnd(fromUuid));
cacheLastUuid(Optional.empty(), useDynamo);
cacheLastUuid(Optional.empty());
cache.setAccelerated(false);
} else {
logger.info("Processing chunk");
@@ -157,70 +140,42 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
for (AccountDatabaseCrawlerListener listener : listeners) {
listener.timeAndProcessCrawlChunk(fromUuid, chunkAccounts.getAccounts());
}
cacheLastUuid(chunkAccounts.getLastUuid(), useDynamo);
cacheLastUuid(chunkAccounts.getLastUuid());
} catch (AccountDatabaseCrawlerRestartException e) {
cacheLastUuid(Optional.empty(), useDynamo);
cacheLastUuid(Optional.empty());
cache.setAccelerated(false);
}
}
}
}
/**
* This is an optimization based on the observation that cold reads of chunks are slow, but subsequent reads of the
* same chunk (within a few minutes) are fast. We cant easily store the actual result data, since the next chunk
* might be processed elsewhere, but the time savings are still substantial.
*/
private void primeDatabaseForNextChunkAsync(Optional<UUID> fromUuid, int chunkSize, boolean useDynamo) {
if (dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration()
.isCrawlerPreReadNextChunkEnabled()) {
if (!useDynamo && fromUuid.isPresent()) {
chunkPreReadExecutorService.submit(() -> readChunk(fromUuid, chunkSize, false, preReadChunkTimer));
}
}
private AccountCrawlChunk readChunk(Optional<UUID> fromUuid, int chunkSize) {
return readChunk(fromUuid, chunkSize, readChunkTimer);
}
private AccountCrawlChunk readChunk(Optional<UUID> fromUuid, int chunkSize, boolean useDynamo) {
return readChunk(fromUuid, chunkSize, useDynamo, readChunkTimer);
}
private AccountCrawlChunk readChunk(Optional<UUID> fromUuid, int chunkSize, boolean useDynamo, Timer readTimer) {
private AccountCrawlChunk readChunk(Optional<UUID> fromUuid, int chunkSize, Timer readTimer) {
try (Timer.Context timer = readTimer.time()) {
if (fromUuid.isPresent()) {
return useDynamo
? accounts.getAllFromDynamo(fromUuid.get(), chunkSize)
: accounts.getAllFrom(fromUuid.get(), chunkSize);
return accounts.getAllFromDynamo(fromUuid.get(), chunkSize);
}
return useDynamo
? accounts.getAllFromDynamo(chunkSize)
: accounts.getAllFrom(chunkSize);
return accounts.getAllFromDynamo(chunkSize);
}
}
private Optional<UUID> getLastUuid(final boolean useDynamo) {
if (useDynamo) {
return cache.getLastUuidDynamo();
} else {
return cache.getLastUuid();
}
private Optional<UUID> getLastUuid() {
return cache.getLastUuidDynamo();
}
private void cacheLastUuid(final Optional<UUID> lastUuid, final boolean useDynamo) {
if (useDynamo) {
cache.setLastUuidDynamo(lastUuid);
} else {
cache.setLastUuid(lastUuid);
}
}
public void setDedicatedDynamoMigrationCrawler(final boolean dedicatedDynamoMigrationCrawler) {
this.dedicatedDynamoMigrationCrawler = dedicatedDynamoMigrationCrawler;
private void cacheLastUuid(final Optional<UUID> lastUuid) {
cache.setLastUuidDynamo(lastUuid);
}
private synchronized void sleepWhileRunning(long delayMs) {
if (running.get()) Util.wait(this, delayMs);
if (running.get()) {
Util.wait(this, delayMs);
}
}
}

View File

@@ -27,8 +27,6 @@ public class AccountDatabaseCrawlerCache {
private final FaultTolerantRedisCluster cacheCluster;
private final ClusterLuaScript unlockClusterScript;
private String prefix = "";
public AccountDatabaseCrawlerCache(FaultTolerantRedisCluster cacheCluster) throws IOException {
this.cacheCluster = cacheCluster;
this.unlockClusterScript = ClusterLuaScript.fromResource(cacheCluster, "lua/account_database_crawler/unlock.lua",
@@ -37,9 +35,9 @@ public class AccountDatabaseCrawlerCache {
public void setAccelerated(final boolean accelerated) {
if (accelerated) {
cacheCluster.useCluster(connection -> connection.sync().set(getPrefixedKey(ACCELERATE_KEY), "1"));
cacheCluster.useCluster(connection -> connection.sync().set(ACCELERATE_KEY, "1"));
} else {
cacheCluster.useCluster(connection -> connection.sync().del(getPrefixedKey(ACCELERATE_KEY)));
cacheCluster.useCluster(connection -> connection.sync().del(ACCELERATE_KEY));
}
}
@@ -49,16 +47,16 @@ public class AccountDatabaseCrawlerCache {
public boolean claimActiveWork(String workerId, long ttlMs) {
return "OK".equals(cacheCluster.withCluster(connection -> connection.sync()
.set(getPrefixedKey(ACTIVE_WORKER_KEY), workerId, SetArgs.Builder.nx().px(ttlMs))));
.set(ACTIVE_WORKER_KEY, workerId, SetArgs.Builder.nx().px(ttlMs))));
}
public void releaseActiveWork(String workerId) {
unlockClusterScript.execute(List.of(getPrefixedKey(ACTIVE_WORKER_KEY)), List.of(workerId));
unlockClusterScript.execute(List.of(ACTIVE_WORKER_KEY), List.of(workerId));
}
public Optional<UUID> getLastUuid() {
final String lastUuidString = cacheCluster.withCluster(
connection -> connection.sync().get(getPrefixedKey(LAST_UUID_KEY)));
connection -> connection.sync().get(LAST_UUID_KEY));
if (lastUuidString == null) {
return Optional.empty();
@@ -70,15 +68,15 @@ public class AccountDatabaseCrawlerCache {
public void setLastUuid(Optional<UUID> lastUuid) {
if (lastUuid.isPresent()) {
cacheCluster.useCluster(connection -> connection.sync()
.psetex(getPrefixedKey(LAST_UUID_KEY), LAST_NUMBER_TTL_MS, lastUuid.get().toString()));
.psetex(LAST_UUID_KEY, LAST_NUMBER_TTL_MS, lastUuid.get().toString()));
} else {
cacheCluster.useCluster(connection -> connection.sync().del(getPrefixedKey(LAST_UUID_KEY)));
cacheCluster.useCluster(connection -> connection.sync().del(LAST_UUID_KEY));
}
}
public Optional<UUID> getLastUuidDynamo() {
final String lastUuidString = cacheCluster.withCluster(
connection -> connection.sync().get(getPrefixedKey(LAST_UUID_DYNAMO_KEY)));
connection -> connection.sync().get(LAST_UUID_DYNAMO_KEY));
if (lastUuidString == null) {
return Optional.empty();
@@ -91,21 +89,9 @@ public class AccountDatabaseCrawlerCache {
if (lastUuid.isPresent()) {
cacheCluster.useCluster(
connection -> connection.sync()
.psetex(getPrefixedKey(LAST_UUID_DYNAMO_KEY), LAST_NUMBER_TTL_MS, lastUuid.get().toString()));
.psetex(LAST_UUID_DYNAMO_KEY, LAST_NUMBER_TTL_MS, lastUuid.get().toString()));
} else {
cacheCluster.useCluster(connection -> connection.sync().del(getPrefixedKey(LAST_UUID_DYNAMO_KEY)));
cacheCluster.useCluster(connection -> connection.sync().del(LAST_UUID_DYNAMO_KEY));
}
}
private String getPrefixedKey(final String key) {
return prefix + key;
}
/**
* Set a cache key prefix, allowing for uses beyond the canonical crawler
*/
public void setPrefix(final String prefix) {
this.prefix = prefix + "::";
}
}

View File

@@ -1,172 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import static com.codahale.metrics.MetricRegistry.name;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import com.codahale.metrics.Timer.Context;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.jdbi.v3.core.transaction.TransactionIsolationLevel;
import org.whispersystems.textsecuregcm.storage.mappers.AccountRowMapper;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.SystemMapper;
public class Accounts implements AccountStore {
public static final String ID = "id";
public static final String UID = "uuid";
public static final String NUMBER = "number";
public static final String DATA = "data";
public static final String VERSION = "version";
private static final ObjectMapper mapper = SystemMapper.getMapper();
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private final Timer createTimer = metricRegistry.timer(name(Accounts.class, "create" ));
private final Timer updateTimer = metricRegistry.timer(name(Accounts.class, "update" ));
private final Timer getByNumberTimer = metricRegistry.timer(name(Accounts.class, "getByNumber" ));
private final Timer getByUuidTimer = metricRegistry.timer(name(Accounts.class, "getByUuid" ));
private final Timer getAllFromTimer = metricRegistry.timer(name(Accounts.class, "getAllFrom" ));
private final Timer getAllFromOffsetTimer = metricRegistry.timer(name(Accounts.class, "getAllFromOffset"));
private final Timer deleteTimer = metricRegistry.timer(name(Accounts.class, "delete" ));
private final Timer vacuumTimer = metricRegistry.timer(name(Accounts.class, "vacuum" ));
private final FaultTolerantDatabase database;
public Accounts(FaultTolerantDatabase database) {
this.database = database;
this.database.getDatabase().registerRowMapper(new AccountRowMapper());
}
@Override
public boolean create(Account account) {
return database.with(jdbi -> jdbi.inTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> {
try (Timer.Context ignored = createTimer.time()) {
final Map<String, Object> resultMap = handle.createQuery("INSERT INTO accounts (" + NUMBER + ", " + UID + ", " + DATA + ") VALUES (:number, :uuid, CAST(:data AS json)) ON CONFLICT(number) DO UPDATE SET " + DATA + " = EXCLUDED.data, " + VERSION + " = accounts.version + 1 RETURNING uuid, version")
.bind("number", account.getNumber())
.bind("uuid", account.getUuid())
.bind("data", mapper.writeValueAsString(account))
.mapToMap()
.findOnly();
final UUID uuid = (UUID) resultMap.get(UID);
final int version = (int) resultMap.get(VERSION);
boolean isNew = uuid.equals(account.getUuid());
account.setUuid(uuid);
account.setVersion(version);
return isNew;
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(e);
}
}));
}
@Override
public void update(Account account) throws ContestedOptimisticLockException {
database.use(jdbi -> jdbi.useHandle(handle -> {
try (Timer.Context ignored = updateTimer.time()) {
final int newVersion = account.getVersion() + 1;
int rowsModified = handle.createUpdate("UPDATE accounts SET " + DATA + " = CAST(:data AS json), " + VERSION + " = :newVersion WHERE " + UID + " = :uuid AND " + VERSION + " = :version")
.bind("uuid", account.getUuid())
.bind("data", mapper.writeValueAsString(account))
.bind("version", account.getVersion())
.bind("newVersion", newVersion)
.execute();
if (rowsModified == 0) {
throw new ContestedOptimisticLockException();
}
account.setVersion(newVersion);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(e);
}
}));
}
@Override
public Optional<Account> get(String number) {
return database.with(jdbi -> jdbi.withHandle(handle -> {
try (Timer.Context ignored = getByNumberTimer.time()) {
return handle.createQuery("SELECT * FROM accounts WHERE " + NUMBER + " = :number")
.bind("number", number)
.mapTo(Account.class)
.findFirst();
}
}));
}
@Override
public Optional<Account> get(UUID uuid) {
return database.with(jdbi -> jdbi.withHandle(handle -> {
try (Timer.Context ignored = getByUuidTimer.time()) {
return handle.createQuery("SELECT * FROM accounts WHERE " + UID + " = :uuid")
.bind("uuid", uuid)
.mapTo(Account.class)
.findFirst();
}
}));
}
public AccountCrawlChunk getAllFrom(UUID from, int length) {
final List<Account> accounts = database.with(jdbi -> jdbi.withHandle(handle -> {
try (Context ignored = getAllFromOffsetTimer.time()) {
return handle.createQuery("SELECT * FROM accounts WHERE " + UID + " > :from ORDER BY " + UID + " LIMIT :limit")
.bind("from", from)
.bind("limit", length)
.mapTo(Account.class)
.list();
}
}));
return buildChunkForAccounts(accounts);
}
public AccountCrawlChunk getAllFrom(int length) {
final List<Account> accounts = database.with(jdbi -> jdbi.withHandle(handle -> {
try (Timer.Context ignored = getAllFromTimer.time()) {
return handle.createQuery("SELECT * FROM accounts ORDER BY " + UID + " LIMIT :limit")
.bind("limit", length)
.mapTo(Account.class)
.list();
}
}));
return buildChunkForAccounts(accounts);
}
private AccountCrawlChunk buildChunkForAccounts(final List<Account> accounts) {
return new AccountCrawlChunk(accounts, accounts.isEmpty() ? null : accounts.get(accounts.size() - 1).getUuid());
}
@Override
public void delete(final UUID uuid) {
database.use(jdbi -> jdbi.useHandle(handle -> {
try (Timer.Context ignored = deleteTimer.time()) {
handle.createUpdate("DELETE FROM accounts WHERE " + UID + " = :uuid")
.bind("uuid", uuid)
.execute();
}
}));
}
public void vacuum() {
database.use(jdbi -> jdbi.useHandle(handle -> {
try (Timer.Context ignored = vacuumTimer.time()) {
handle.execute("VACUUM accounts");
}
}));
}
}

View File

@@ -8,7 +8,6 @@ import static com.codahale.metrics.MetricRegistry.name;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import java.io.IOException;
@@ -17,16 +16,10 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.util.AttributeValues;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.textsecuregcm.util.UUIDUtil;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.CancellationReason;
@@ -59,12 +52,6 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
static final String ATTR_CANONICALLY_DISCOVERABLE = "C";
private final DynamoDbClient client;
private final DynamoDbAsyncClient asyncClient;
private final ThreadPoolExecutor migrationThreadPool;
private final MigrationDeletedAccounts migrationDeletedAccounts;
private final MigrationRetryAccounts migrationRetryAccounts;
private final String phoneNumbersTableName;
private final String accountsTableName;
@@ -76,26 +63,15 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
private static final Timer GET_ALL_FROM_START_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "getAllFrom"));
private static final Timer GET_ALL_FROM_OFFSET_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "getAllFromOffset"));
private static final Timer DELETE_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "delete"));
private static final Timer DELETE_RECENTLY_DELETED_UUIDS_TIMER = Metrics.timer(
name(AccountsDynamoDb.class, "deleteRecentlyDeletedUuids"));
private final Logger logger = LoggerFactory.getLogger(AccountsDynamoDb.class);
public AccountsDynamoDb(DynamoDbClient client, DynamoDbAsyncClient asyncClient,
ThreadPoolExecutor migrationThreadPool, String accountsTableName, String phoneNumbersTableName,
MigrationDeletedAccounts migrationDeletedAccounts,
MigrationRetryAccounts accountsMigrationErrors) {
public AccountsDynamoDb(DynamoDbClient client, String accountsTableName, String phoneNumbersTableName) {
super(client);
this.client = client;
this.asyncClient = asyncClient;
this.phoneNumbersTableName = phoneNumbersTableName;
this.accountsTableName = accountsTableName;
this.migrationThreadPool = migrationThreadPool;
this.migrationDeletedAccounts = migrationDeletedAccounts;
this.migrationRetryAccounts = accountsMigrationErrors;
}
@Override
@@ -215,29 +191,19 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
}
try {
try {
UpdateItemResponse response = client.updateItem(updateItemRequest);
UpdateItemResponse response = client.updateItem(updateItemRequest);
account.setVersion(AttributeValues.getInt(response.attributes(), "V", account.getVersion() + 1));
} catch (final TransactionConflictException e) {
account.setVersion(AttributeValues.getInt(response.attributes(), "V", account.getVersion() + 1));
} catch (final TransactionConflictException e) {
throw new ContestedOptimisticLockException();
throw new ContestedOptimisticLockException();
} catch (final ConditionalCheckFailedException e) {
} catch (final ConditionalCheckFailedException e) {
// the exception doesnt give details about which condition failed,
// but we can infer it was an optimistic locking failure if the UUID is known
throw get(account.getUuid()).isPresent() ? new ContestedOptimisticLockException() : e;
}
} catch (final Exception e) {
if (!(e instanceof ContestedOptimisticLockException)) {
// the Dynamo account now lags the Postgres account version. Put it in the migration retry table so that it will
// get updated faster—otherwise it will be stale until the accounts crawler runs again
migrationRetryAccounts.put(account.getUuid());
}
throw e;
// the exception doesnt give details about which condition failed,
// but we can infer it was an optimistic locking failure if the UUID is known
throw get(account.getUuid()).isPresent() ? new ContestedOptimisticLockException() : e;
}
});
}
@@ -279,10 +245,6 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
DELETE_TIMER.record(() -> delete(uuid, true));
}
public void deleteInvalidMigration(UUID uuid) {
DELETE_TIMER.record(() -> delete(uuid, false));
}
public AccountCrawlChunk getAllFrom(final UUID from, final int maxCount, final int pageSize) {
final ScanRequest.Builder scanRequestBuilder = ScanRequest.builder()
.limit(pageSize)
@@ -312,10 +274,6 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
private void delete(UUID uuid, boolean saveInDeletedAccountsTable) {
if (saveInDeletedAccountsTable) {
migrationDeletedAccounts.put(uuid);
}
Optional<Account> maybeAccount = get(uuid);
maybeAccount.ifPresent(account -> {
@@ -341,105 +299,6 @@ public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountSt
});
}
private static final Counter MIGRATED_COUNTER = Metrics.counter(name(AccountsDynamoDb.class, "migration", "count"));
private static final Counter ERROR_COUNTER = Metrics.counter(name(AccountsDynamoDb.class, "migration", "error"));
public CompletableFuture<Void> migrate(List<Account> accounts, int threads) {
if (threads > migrationThreadPool.getMaximumPoolSize()) {
migrationThreadPool.setMaximumPoolSize(threads);
migrationThreadPool.setCorePoolSize(threads);
} else {
migrationThreadPool.setCorePoolSize(threads);
migrationThreadPool.setMaximumPoolSize(threads);
}
final List<CompletableFuture<?>> futures = accounts.stream()
.map(this::migrate)
.map(f -> f.whenCompleteAsync((migrated, e) -> {
if (e == null) {
MIGRATED_COUNTER.increment(migrated ? 1 : 0);
} else {
ERROR_COUNTER.increment();
}
}, migrationThreadPool))
.collect(Collectors.toList());
CompletableFuture<Void> migrationBatch = CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{}));
return migrationBatch.whenCompleteAsync((result, exception) -> {
if (exception != null) {
logger.warn("Exception migrating batch", exception);
}
deleteRecentlyDeletedUuids();
}, migrationThreadPool);
}
public void deleteRecentlyDeletedUuids() {
DELETE_RECENTLY_DELETED_UUIDS_TIMER.record(() -> {
final List<UUID> recentlyDeletedUuids = migrationDeletedAccounts.getRecentlyDeletedUuids();
for (UUID recentlyDeletedUuid : recentlyDeletedUuids) {
delete(recentlyDeletedUuid, false);
}
migrationDeletedAccounts.delete(recentlyDeletedUuids);
});
}
public CompletableFuture<Boolean> migrate(Account account) {
try {
TransactWriteItem phoneNumberConstraintPut = buildPutWriteItemForPhoneNumberConstraint(account, account.getUuid());
TransactWriteItem accountPut = buildPutWriteItemForAccount(account, account.getUuid(), Put.builder()
.conditionExpression("attribute_not_exists(#uuid) OR (attribute_exists(#uuid) AND #version < :version)")
.expressionAttributeNames(Map.of(
"#uuid", KEY_ACCOUNT_UUID,
"#version", ATTR_VERSION))
.expressionAttributeValues(Map.of(
":version", AttributeValues.fromInt(account.getVersion()))));
final TransactWriteItemsRequest request = TransactWriteItemsRequest.builder()
.transactItems(phoneNumberConstraintPut, accountPut).build();
final CompletableFuture<Boolean> resultFuture = new CompletableFuture<>();
asyncClient.transactWriteItems(request).whenCompleteAsync((result, exception) -> {
if (result != null) {
resultFuture.complete(true);
return;
}
if (exception instanceof CompletionException) {
// whenCompleteAsync can wrap exceptions in a CompletionException; unwrap it to get to the root cause.
exception = exception.getCause();
}
if (exception instanceof TransactionCanceledException) {
// account is already migrated
resultFuture.complete(false);
return;
}
try {
migrationRetryAccounts.put(account.getUuid());
} catch (final Exception e) {
logger.error("Could not store account {}", account.getUuid());
}
resultFuture.completeExceptionally(exception);
}, migrationThreadPool);
return resultFuture;
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}
void putUuidForMigrationRetry(final UUID uuid) {
try {
migrationRetryAccounts.put(uuid);
} catch (final Exception e) {
logger.error("Failed to store for retry: {}", uuid, e);
}
}
private static String extractCancellationReasonCodes(final TransactionCanceledException exception) {
return exception.cancellationReasons().stream()
.map(CancellationReason::code)

View File

@@ -1,41 +0,0 @@
package org.whispersystems.textsecuregcm.storage;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
public class AccountsDynamoDbMigrator extends AccountDatabaseCrawlerListener {
private final AccountsDynamoDb accountsDynamoDb;
private final DynamicConfigurationManager dynamicConfigurationManager;
public AccountsDynamoDbMigrator(final AccountsDynamoDb accountsDynamoDb, final DynamicConfigurationManager dynamicConfigurationManager) {
this.accountsDynamoDb = accountsDynamoDb;
this.dynamicConfigurationManager = dynamicConfigurationManager;
}
@Override
public void onCrawlStart() {
}
@Override
public void onCrawlEnd(Optional<UUID> fromUuid) {
}
@Override
protected void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) {
if (!dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration()
.isBackgroundMigrationEnabled()) {
return;
}
final CompletableFuture<Void> migrationBatch = accountsDynamoDb.migrate(chunkAccounts,
dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().getBackgroundMigrationExecutorThreads());
migrationBatch.join();
}
}

View File

@@ -11,22 +11,17 @@ import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.lettuce.core.RedisException;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tags;
import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -37,7 +32,6 @@ import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.AuthenticationCredentials;
import org.whispersystems.textsecuregcm.controllers.AccountController;
import org.whispersystems.textsecuregcm.entities.AccountAttributes;
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.securebackup.SecureBackupClient;
import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient;
@@ -69,20 +63,14 @@ public class AccountsManager {
private static final String COUNTRY_CODE_TAG_NAME = "country";
private static final String DELETION_REASON_TAG_NAME = "reason";
private static final String DYNAMO_MIGRATION_ERROR_COUNTER_NAME = name(AccountsManager.class, "migration", "error");
private static final Counter DYNAMO_MIGRATION_COMPARISON_COUNTER = Metrics.counter(name(AccountsManager.class, "migration", "comparisons"));
private static final String DYNAMO_MIGRATION_MISMATCH_COUNTER_NAME = name(AccountsManager.class, "migration", "mismatches");
private final Logger logger = LoggerFactory.getLogger(AccountsManager.class);
private final Accounts accounts;
private final AccountsDynamoDb accountsDynamoDb;
private final FaultTolerantRedisCluster cacheCluster;
private final DeletedAccountsManager deletedAccountsManager;
private final DirectoryQueue directoryQueue;
private final KeysDynamoDb keysDynamoDb;
private final MessagesManager messagesManager;
private final MigrationMismatchedAccounts mismatchedAccounts;
private final UsernamesManager usernamesManager;
private final ProfilesManager profilesManager;
private final StoredVerificationCodeManager pendingAccounts;
@@ -90,10 +78,7 @@ public class AccountsManager {
private final SecureBackupClient secureBackupClient;
private final ObjectMapper mapper;
private final ObjectMapper migrationComparisonMapper;
private final DynamicConfigurationManager dynamicConfigurationManager;
private final ExperimentEnrollmentManager experimentEnrollmentManager;
public enum DeletionReason {
ADMIN_DELETED("admin"),
@@ -107,25 +92,22 @@ public class AccountsManager {
}
}
public AccountsManager(Accounts accounts, AccountsDynamoDb accountsDynamoDb, FaultTolerantRedisCluster cacheCluster,
public AccountsManager(AccountsDynamoDb accountsDynamoDb, FaultTolerantRedisCluster cacheCluster,
final DeletedAccountsManager deletedAccountsManager,
final DirectoryQueue directoryQueue,
final KeysDynamoDb keysDynamoDb, final MessagesManager messagesManager,
final MigrationMismatchedAccounts mismatchedAccounts, final UsernamesManager usernamesManager,
final UsernamesManager usernamesManager,
final ProfilesManager profilesManager,
final StoredVerificationCodeManager pendingAccounts,
final SecureStorageClient secureStorageClient,
final SecureBackupClient secureBackupClient,
final ExperimentEnrollmentManager experimentEnrollmentManager,
final DynamicConfigurationManager dynamicConfigurationManager) {
this.accounts = accounts;
this.accountsDynamoDb = accountsDynamoDb;
this.accountsDynamoDb = accountsDynamoDb;
this.cacheCluster = cacheCluster;
this.deletedAccountsManager = deletedAccountsManager;
this.directoryQueue = directoryQueue;
this.keysDynamoDb = keysDynamoDb;
this.messagesManager = messagesManager;
this.mismatchedAccounts = mismatchedAccounts;
this.usernamesManager = usernamesManager;
this.profilesManager = profilesManager;
this.pendingAccounts = pendingAccounts;
@@ -133,11 +115,7 @@ public class AccountsManager {
this.secureBackupClient = secureBackupClient;
this.mapper = SystemMapper.getMapper();
this.migrationComparisonMapper = mapper.copy();
migrationComparisonMapper.addMixIn(Device.class, DeviceComparisonMixin.class);
this.dynamicConfigurationManager = dynamicConfigurationManager;
this.experimentEnrollmentManager = experimentEnrollmentManager;
}
public Account create(final String number,
@@ -170,36 +148,12 @@ public class AccountsManager {
final UUID originalUuid = account.getUuid();
boolean freshUser = primaryCreate(account);
boolean freshUser = dynamoCreate(account);
// create() sometimes updates the UUID, if there was a number conflict.
// for metrics, we want secondary to run with the same original UUID
final UUID actualUuid = account.getUuid();
try {
if (secondaryWriteEnabled()) {
account.setUuid(originalUuid);
runSafelyAndRecordMetrics(() -> secondaryCreate(account), Optional.of(account.getUuid()), freshUser,
(primaryResult, secondaryResult) -> {
if (primaryResult.equals(secondaryResult)) {
return Optional.empty();
}
if (secondaryResult) {
return Optional.of("secondaryFreshUser");
}
return Optional.of("primaryFreshUser");
},
"create");
}
} finally {
account.setUuid(actualUuid);
}
redisSet(account);
pendingAccounts.remove(number);
@@ -293,26 +247,7 @@ public class AccountsManager {
final UUID uuid = account.getUuid();
updatedAccount = updateWithRetries(account, updater, this::primaryUpdate, () -> primaryGet(uuid).get());
if (secondaryWriteEnabled()) {
runSafelyAndRecordMetrics(() -> secondaryGet(uuid).map(secondaryAccount -> {
try {
return updateWithRetries(secondaryAccount, updater, this::secondaryUpdate, () -> secondaryGet(uuid).get());
} catch (final OptimisticLockRetryLimitExceededException e) {
if (!dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration()
.isDynamoPrimary()) {
accountsDynamoDb.putUuidForMigrationRetry(uuid);
}
throw e;
}
}),
Optional.of(uuid),
Optional.of(updatedAccount),
this::compareAccounts,
"update");
}
updatedAccount = updateWithRetries(account, updater, this::dynamoUpdate, () -> dynamoGet(uuid).get());
redisSet(updatedAccount);
}
@@ -378,14 +313,9 @@ public class AccountsManager {
try (Timer.Context ignored = getByNumberTimer.time()) {
Optional<Account> account = redisGet(number);
if (!account.isPresent()) {
account = primaryGet(number);
account.ifPresent(value -> redisSet(value));
if (secondaryReadEnabled()) {
runSafelyAndRecordMetrics(() -> secondaryGet(number), Optional.empty(), account, this::compareAccounts,
"getByNumber");
}
if (account.isEmpty()) {
account = dynamoGet(number);
account.ifPresent(this::redisSet);
}
return account;
@@ -396,29 +326,15 @@ public class AccountsManager {
try (Timer.Context ignored = getByUuidTimer.time()) {
Optional<Account> account = redisGet(uuid);
if (!account.isPresent()) {
account = primaryGet(uuid);
account.ifPresent(value -> redisSet(value));
if (secondaryReadEnabled()) {
runSafelyAndRecordMetrics(() -> secondaryGet(uuid), Optional.of(uuid), account, this::compareAccounts,
"getByUuid");
}
if (account.isEmpty()) {
account = dynamoGet(uuid);
account.ifPresent(this::redisSet);
}
return account;
}
}
public AccountCrawlChunk getAllFrom(int length) {
return accounts.getAllFrom(length);
}
public AccountCrawlChunk getAllFrom(UUID uuid, int length) {
return accounts.getAllFrom(uuid, length);
}
public AccountCrawlChunk getAllFromDynamo(int length) {
final int maxPageSize = dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration()
.getDynamoCrawlerScanPageSize();
@@ -447,16 +363,7 @@ public class AccountsManager {
deleteBackupServiceDataFuture.join();
redisDelete(account);
primaryDelete(account);
if (secondaryDeleteEnabled()) {
try {
secondaryDelete(account);
} catch (final Exception e) {
logger.error("Could not delete account {} from secondary", account.getUuid().toString());
Metrics.counter(DYNAMO_MIGRATION_ERROR_COUNTER_NAME, "action", "delete").increment();
}
}
dynamoDelete(account);
return account.getUuid();
});
@@ -537,100 +444,6 @@ public class AccountsManager {
}
}
private Optional<Account> primaryGet(String number) {
return dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary()
?
dynamoGet(number) :
databaseGet(number);
}
private Optional<Account> secondaryGet(String number) {
return dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary()
?
databaseGet(number) :
dynamoGet(number);
}
private Optional<Account> primaryGet(UUID uuid) {
return dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary()
?
dynamoGet(uuid) :
databaseGet(uuid);
}
private Optional<Account> secondaryGet(UUID uuid) {
return dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary()
?
databaseGet(uuid) :
dynamoGet(uuid);
}
private boolean primaryCreate(Account account) {
return dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary()
?
dynamoCreate(account) :
databaseCreate(account);
}
private boolean secondaryCreate(Account account) {
return dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary()
?
databaseCreate(account) :
dynamoCreate(account);
}
private void primaryUpdate(Account account) {
if (dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary()) {
dynamoUpdate(account);
} else {
databaseUpdate(account);
}
}
private void secondaryUpdate(Account account) {
if (dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary()) {
databaseUpdate(account);
} else {
dynamoUpdate(account);
}
}
private void primaryDelete(Account account) {
if (dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary()) {
dynamoDelete(account);
} else {
databaseDelete(account);
}
}
private void secondaryDelete(Account account) {
if (dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDynamoPrimary()) {
databaseDelete(account);
} else {
dynamoDelete(account);
}
}
private Optional<Account> databaseGet(String number) {
return accounts.get(number);
}
private Optional<Account> databaseGet(UUID uuid) {
return accounts.get(uuid);
}
private boolean databaseCreate(Account account) {
return accounts.create(account);
}
private void databaseUpdate(Account account) {
accounts.update(account);
}
private void databaseDelete(final Account account) {
accounts.delete(account.getUuid());
}
private Optional<Account> dynamoGet(String number) {
return accountsDynamoDb.get(number);
}
@@ -651,175 +464,14 @@ public class AccountsManager {
accountsDynamoDb.delete(account.getUuid());
}
private boolean secondaryDeleteEnabled() {
return dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDeleteEnabled();
}
private boolean secondaryReadEnabled() {
return dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isReadEnabled();
}
private boolean secondaryWriteEnabled() {
return secondaryDeleteEnabled()
&& dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isWriteEnabled();
}
// TODO delete
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
@Deprecated
public Optional<String> compareAccounts(final Optional<Account> maybePrimaryAccount,
final Optional<Account> maybeSecondaryAccount) {
if (maybePrimaryAccount.isEmpty() && maybeSecondaryAccount.isEmpty()) {
return Optional.empty();
}
if (maybePrimaryAccount.isEmpty()) {
return Optional.of("primaryMissing");
}
if (maybeSecondaryAccount.isEmpty()) {
return Optional.of("secondaryMissing");
}
final Account primaryAccount = maybePrimaryAccount.get();
final Account secondaryAccount = maybeSecondaryAccount.get();
final int uuidCompare = primaryAccount.getUuid().compareTo(secondaryAccount.getUuid());
if (uuidCompare != 0) {
return Optional.of("uuid");
}
final int numberCompare = primaryAccount.getNumber().compareTo(secondaryAccount.getNumber());
if (numberCompare != 0) {
return Optional.of("number");
}
if (!Objects.equals(primaryAccount.getIdentityKey(), secondaryAccount.getIdentityKey())) {
return Optional.of("identityKey");
}
if (!Objects.equals(primaryAccount.getCurrentProfileVersion(), secondaryAccount.getCurrentProfileVersion())) {
return Optional.of("currentProfileVersion");
}
if (!Objects.equals(primaryAccount.getProfileName(), secondaryAccount.getProfileName())) {
return Optional.of("profileName");
}
if (!Objects.equals(primaryAccount.getAvatar(), secondaryAccount.getAvatar())) {
return Optional.of("avatar");
}
if (!Objects.equals(primaryAccount.getUnidentifiedAccessKey(), secondaryAccount.getUnidentifiedAccessKey())) {
if (primaryAccount.getUnidentifiedAccessKey().isPresent() && secondaryAccount.getUnidentifiedAccessKey()
.isPresent()) {
if (Arrays.compare(primaryAccount.getUnidentifiedAccessKey().get(),
secondaryAccount.getUnidentifiedAccessKey().get()) != 0) {
return Optional.of("unidentifiedAccessKey");
}
} else {
return Optional.of("unidentifiedAccessKey");
}
}
if (!Objects.equals(primaryAccount.isUnrestrictedUnidentifiedAccess(),
secondaryAccount.isUnrestrictedUnidentifiedAccess())) {
return Optional.of("unrestrictedUnidentifiedAccess");
}
if (!Objects.equals(primaryAccount.isDiscoverableByPhoneNumber(), secondaryAccount.isDiscoverableByPhoneNumber())) {
return Optional.of("discoverableByPhoneNumber");
}
if (primaryAccount.getMasterDevice().isPresent() && secondaryAccount.getMasterDevice().isPresent()) {
if (!Objects.equals(primaryAccount.getMasterDevice().get().getSignedPreKey(),
secondaryAccount.getMasterDevice().get().getSignedPreKey())) {
return Optional.of("masterDeviceSignedPreKey");
}
}
try {
if (!serializedEquals(primaryAccount.getDevices(), secondaryAccount.getDevices())) {
return Optional.of("devices");
}
if (primaryAccount.getVersion() != secondaryAccount.getVersion()) {
return Optional.of("version");
}
if (primaryAccount.getMasterDevice().isPresent() && secondaryAccount.getMasterDevice().isPresent()) {
if (Math.abs(primaryAccount.getMasterDevice().get().getPushTimestamp() -
secondaryAccount.getMasterDevice().get().getPushTimestamp()) > 60 * 1_000L) {
// These are generally few milliseconds off, because the setter uses System.currentTimeMillis() internally,
// but we can be more relaxed
return Optional.of("masterDevicePushTimestamp");
}
}
if (!serializedEquals(primaryAccount, secondaryAccount)) {
return Optional.of("serialization");
}
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
return Optional.empty();
}
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private <T> void runSafelyAndRecordMetrics(Callable<T> callable, Optional<UUID> maybeUuid, final T primaryResult,
final BiFunction<T, T, Optional<String>> mismatchClassifier, final String action) {
if (maybeUuid.isPresent()) {
// the only time we dont have a UUID is in getByNumber, which is sufficiently low volume to not be a concern, and
// it will also be gated by the global readEnabled configuration
final boolean enrolled = experimentEnrollmentManager.isEnrolled(maybeUuid.get(), "accountsDynamoDbMigration");
if (!enrolled) {
return;
}
}
try {
final T secondaryResult = callable.call();
compare(primaryResult, secondaryResult, mismatchClassifier, action, maybeUuid);
} catch (final Exception e) {
logger.error("Error running " + action + " in Dynamo", e);
Metrics.counter(DYNAMO_MIGRATION_ERROR_COUNTER_NAME, "action", action).increment();
}
}
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private <T> void compare(final T primaryResult, final T secondaryResult,
final BiFunction<T, T, Optional<String>> mismatchClassifier, final String action,
final Optional<UUID> maybeUUid) {
DYNAMO_MIGRATION_COMPARISON_COUNTER.increment();
mismatchClassifier.apply(primaryResult, secondaryResult)
.ifPresent(mismatchType -> {
final String mismatchDescription = action + ":" + mismatchType;
Metrics.counter(DYNAMO_MIGRATION_MISMATCH_COUNTER_NAME,
"mismatchType", mismatchDescription)
.increment();
maybeUUid.ifPresent(uuid -> {
if (dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration()
.isPostCheckMismatches()) {
mismatchedAccounts.put(uuid);
}
});
});
}
private String getAbbreviatedCallChain(final StackTraceElement[] stackTrace) {
return Arrays.stream(stackTrace)
.filter(stackTraceElement -> stackTraceElement.getClassName().contains("org.whispersystems"))
@@ -827,22 +479,4 @@ public class AccountsManager {
.map(stackTraceElement -> StringUtils.substringAfterLast(stackTraceElement.getClassName(), ".") + ":" + stackTraceElement.getMethodName())
.collect(Collectors.joining(" -> "));
}
private static abstract class DeviceComparisonMixin extends Device {
@JsonIgnore
private long lastSeen;
@JsonIgnore
private long pushTimestamp;
}
private boolean serializedEquals(final Object primary, final Object secondary) throws JsonProcessingException {
final byte[] primarySerialized = migrationComparisonMapper.writeValueAsBytes(primary);
final byte[] secondarySerialized = migrationComparisonMapper.writeValueAsBytes(secondary);
final int serializeCompare = Arrays.compare(primarySerialized, secondarySerialized);
return serializeCompare == 0;
}
}

View File

@@ -1,71 +0,0 @@
package org.whispersystems.textsecuregcm.storage;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import org.whispersystems.textsecuregcm.util.AttributeValues;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.DeleteRequest;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
public class MigrationDeletedAccounts extends AbstractDynamoDbStore {
private final String tableName;
static final String KEY_UUID = "U";
public MigrationDeletedAccounts(DynamoDbClient dynamoDb, String tableName) {
super(dynamoDb);
this.tableName = tableName;
}
public void put(UUID uuid) {
db().putItem(PutItemRequest.builder()
.tableName(tableName)
.item(primaryKey(uuid))
.build());
}
public List<UUID> getRecentlyDeletedUuids() {
final List<UUID> uuids = new ArrayList<>();
Optional<ScanResponse> firstPage = db().scanPaginator(ScanRequest.builder()
.tableName(tableName)
.build()).stream().findAny(); // get the first available response
if (firstPage.isPresent()) {
for (Map<String, AttributeValue> item : firstPage.get().items()) {
// only process one page each time. If we have a significant backlog at the end of the migration
// we can handle it separately
uuids.add(AttributeValues.getUUID(item, KEY_UUID, null));
}
}
return uuids;
}
public void delete(List<UUID> uuids) {
writeInBatches(uuids, (batch) -> {
List<WriteRequest> deletes = batch.stream().map((uuid) -> WriteRequest.builder().deleteRequest(DeleteRequest.builder()
.key(primaryKey(uuid))
.build()).build()).collect(Collectors.toList());
executeTableWriteItemsUntilComplete(Map.of(tableName, deletes));
});
}
@VisibleForTesting
public static Map<String, AttributeValue> primaryKey(UUID uuid) {
return Map.of(KEY_UUID, AttributeValues.fromUUID(uuid));
}
}

View File

@@ -1,111 +0,0 @@
/*
* Copyright 2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import com.google.common.annotations.VisibleForTesting;
import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.whispersystems.textsecuregcm.util.AttributeValues;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.DeleteRequest;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
import software.amazon.awssdk.services.dynamodb.paginators.ScanIterable;
public class MigrationMismatchedAccounts extends AbstractDynamoDbStore {
static final String KEY_UUID = "U";
static final String ATTR_TIMESTAMP = "T";
@VisibleForTesting
static final long MISMATCH_CHECK_DELAY_MILLIS = Duration.ofMinutes(1).toMillis();
private final String tableName;
private final Clock clock;
public void put(UUID uuid) {
final Map<String, AttributeValue> item = primaryKey(uuid);
item.put("T", AttributeValues.fromLong(clock.millis()));
db().putItem(PutItemRequest.builder()
.tableName(tableName)
.item(item)
.build());
}
public MigrationMismatchedAccounts(DynamoDbClient dynamoDb, String tableName) {
this(dynamoDb, tableName, Clock.systemUTC());
}
@VisibleForTesting
MigrationMismatchedAccounts(DynamoDbClient dynamoDb, String tableName, final Clock clock) {
super(dynamoDb);
this.tableName = tableName;
this.clock = clock;
}
/**
* returns a list of UUIDs stored in the table that have passed {@link #MISMATCH_CHECK_DELAY_MILLIS}
*/
public List<UUID> getUuids(int max) {
final List<UUID> uuids = new ArrayList<>();
final ScanIterable scanPaginator = db().scanPaginator(ScanRequest.builder()
.tableName(tableName)
.filterExpression("#timestamp <= :timestamp")
.expressionAttributeNames(Map.of("#timestamp", ATTR_TIMESTAMP))
.expressionAttributeValues(Map.of(":timestamp",
AttributeValues.fromLong(clock.millis() - MISMATCH_CHECK_DELAY_MILLIS)))
.build());
for (ScanResponse response : scanPaginator) {
for (Map<String, AttributeValue> item : response.items()) {
uuids.add(AttributeValues.getUUID(item, KEY_UUID, null));
if (uuids.size() >= max) {
break;
}
}
if (uuids.size() >= max) {
break;
}
}
return uuids;
}
@VisibleForTesting
public static Map<String, AttributeValue> primaryKey(UUID uuid) {
final HashMap<String, AttributeValue> item = new HashMap<>();
item.put(KEY_UUID, AttributeValues.fromUUID(uuid));
return item;
}
public void delete(final List<UUID> uuidsToDelete) {
writeInBatches(uuidsToDelete, (uuids -> {
final List<WriteRequest> deletes = uuids.stream()
.map(uuid -> WriteRequest.builder().deleteRequest(
DeleteRequest.builder().key(Map.of(KEY_UUID, AttributeValues.fromUUID(uuid))).build()).build())
.collect(Collectors.toList());
executeTableWriteItemsUntilComplete(Map.of(tableName, deletes));
}));
}
}

View File

@@ -1,112 +0,0 @@
/*
* Copyright 2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import net.logstash.logback.argument.StructuredArguments;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
public class MigrationMismatchedAccountsTableCrawler extends ManagedPeriodicWork {
private static final Logger logger = LoggerFactory.getLogger(MigrationMismatchedAccountsTableCrawler.class);
private static final Duration WORKER_TTL = Duration.ofMinutes(2);
private static final Duration RUN_INTERVAL = Duration.ofMinutes(1);
private static final String ACTIVE_WORKER_KEY = "migration_mismatched_accounts_crawler_cache_active_worker";
private static final int MAX_BATCH_SIZE = 5_000;
private static final Counter COMPARISONS_COUNTER = Metrics.counter(
name(MigrationMismatchedAccountsTableCrawler.class, "comparisons"));
private static final String MISMATCH_COUNTER_NAME = name(MigrationMismatchedAccountsTableCrawler.class, "mismatches");
private static final Counter ERRORS_COUNTER = Metrics.counter(
name(MigrationMismatchedAccountsTableCrawler.class, "errors"));
private final MigrationMismatchedAccounts mismatchedAccounts;
private final AccountsManager accountsManager;
private final Accounts accountsDb;
private final AccountsDynamoDb accountsDynamoDb;
private final DynamicConfigurationManager dynamicConfigurationManager;
public MigrationMismatchedAccountsTableCrawler(
final MigrationMismatchedAccounts mismatchedAccounts,
final AccountsManager accountsManager,
final Accounts accountsDb,
final AccountsDynamoDb accountsDynamoDb,
final DynamicConfigurationManager dynamicConfigurationManager,
final FaultTolerantRedisCluster cluster,
final ScheduledExecutorService executorService) throws IOException {
super(new ManagedPeriodicWorkLock(ACTIVE_WORKER_KEY, cluster), WORKER_TTL, RUN_INTERVAL, executorService);
this.mismatchedAccounts = mismatchedAccounts;
this.accountsManager = accountsManager;
this.accountsDb = accountsDb;
this.accountsDynamoDb = accountsDynamoDb;
this.dynamicConfigurationManager = dynamicConfigurationManager;
}
@Override
public void doPeriodicWork() {
if (!dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration()
.isPostCheckMismatches()) {
return;
}
final List<UUID> uuids = this.mismatchedAccounts.getUuids(MAX_BATCH_SIZE);
final List<UUID> processedUuids = new ArrayList<>(uuids.size());
try {
for (UUID uuid : uuids) {
try {
final Optional<String> result = accountsManager.compareAccounts(accountsDb.get(uuid),
accountsDynamoDb.get(uuid));
COMPARISONS_COUNTER.increment();
result.ifPresent(mismatchType -> {
Metrics.counter(MISMATCH_COUNTER_NAME, "type", mismatchType)
.increment();
if (dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration()
.isLogMismatches()) {
logger.info("Mismatch: {}", StructuredArguments.entries(Map.of(
"type", mismatchType,
"uuid", uuid)));
}
});
processedUuids.add(uuid);
} catch (final Exception e) {
ERRORS_COUNTER.increment();
logger.warn("Failed to check account mismatch", e);
}
}
} finally {
this.mismatchedAccounts.delete(processedUuids);
}
}
}

View File

@@ -1,76 +0,0 @@
package org.whispersystems.textsecuregcm.storage;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.whispersystems.textsecuregcm.util.AttributeValues;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.DeleteRequest;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
public class MigrationRetryAccounts extends AbstractDynamoDbStore {
private final String tableName;
static final String KEY_UUID = "U";
public MigrationRetryAccounts(DynamoDbClient dynamoDb, String tableName) {
super(dynamoDb);
this.tableName = tableName;
}
public void put(UUID uuid) {
db().putItem(PutItemRequest.builder()
.tableName(tableName)
.item(primaryKey(uuid))
.build());
}
public List<UUID> getUuids(int max) {
final List<UUID> uuids = new ArrayList<>();
for (ScanResponse response : db().scanPaginator(ScanRequest.builder().tableName(tableName).build())) {
for (Map<String, AttributeValue> item : response.items()) {
uuids.add(AttributeValues.getUUID(item, KEY_UUID, null));
if (uuids.size() >= max) {
break;
}
}
if (uuids.size() >= max) {
break;
}
}
return uuids;
}
@VisibleForTesting
public static Map<String, AttributeValue> primaryKey(UUID uuid) {
return Map.of(KEY_UUID, AttributeValues.fromUUID(uuid));
}
public void delete(final List<UUID> uuidsToDelete) {
writeInBatches(uuidsToDelete, (uuids -> {
final List<WriteRequest> deletes = uuids.stream()
.map(uuid -> WriteRequest.builder().deleteRequest(
DeleteRequest.builder().key(Map.of(KEY_UUID, AttributeValues.fromUUID(uuid))).build()).build())
.collect(Collectors.toList());
executeTableWriteItemsUntilComplete(Map.of(tableName, deletes));
}));
}
}

View File

@@ -1,88 +0,0 @@
/*
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import static com.codahale.metrics.MetricRegistry.name;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
public class MigrationRetryAccountsTableCrawler extends ManagedPeriodicWork {
private static final Logger logger = LoggerFactory.getLogger(MigrationRetryAccountsTableCrawler.class);
private static final Duration WORKER_TTL = Duration.ofMinutes(2);
private static final Duration RUN_INTERVAL = Duration.ofMinutes(15);
private static final String ACTIVE_WORKER_KEY = "migration_retry_accounts_crawler_cache_active_worker";
private static final int MAX_BATCH_SIZE = 5_000;
private static final Counter MIGRATED_COUNTER = Metrics.counter(name(MigrationRetryAccountsTableCrawler.class, "migrated"));
private static final Counter ERROR_COUNTER = Metrics.counter(name(MigrationRetryAccountsTableCrawler.class, "error"));
private static final Counter TOTAL_COUNTER = Metrics.counter(name(MigrationRetryAccountsTableCrawler.class, "total"));
private final MigrationRetryAccounts retryAccounts;
private final AccountsManager accountsManager;
private final AccountsDynamoDb accountsDynamoDb;
public MigrationRetryAccountsTableCrawler(
final MigrationRetryAccounts retryAccounts,
final AccountsManager accountsManager,
final AccountsDynamoDb accountsDynamoDb,
final FaultTolerantRedisCluster cluster,
final ScheduledExecutorService executorService) throws IOException {
super(new ManagedPeriodicWorkLock(ACTIVE_WORKER_KEY, cluster), WORKER_TTL, RUN_INTERVAL, executorService);
this.retryAccounts = retryAccounts;
this.accountsManager = accountsManager;
this.accountsDynamoDb = accountsDynamoDb;
}
@Override
public void doPeriodicWork() {
final List<UUID> uuids = this.retryAccounts.getUuids(MAX_BATCH_SIZE);
final List<UUID> processedUuids = new ArrayList<>(uuids.size());
try {
for (UUID uuid : uuids) {
try {
final Optional<Account> maybeDynamoAccount = accountsDynamoDb.get(uuid);
if (maybeDynamoAccount.isEmpty()) {
accountsManager.get(uuid).ifPresent(account -> {
accountsDynamoDb.migrate(account);
MIGRATED_COUNTER.increment();
});
}
processedUuids.add(uuid);
TOTAL_COUNTER.increment();
} catch (final Exception e) {
ERROR_COUNTER.increment();
logger.warn("Failed to migrate account");
}
}
} finally {
this.retryAccounts.delete(processedUuids);
}
}
}

View File

@@ -5,18 +5,16 @@
package org.whispersystems.textsecuregcm.storage;
import static com.codahale.metrics.MetricRegistry.name;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import org.jdbi.v3.core.JdbiException;
import org.whispersystems.textsecuregcm.storage.mappers.AccountRowMapper;
import org.whispersystems.textsecuregcm.util.Constants;
import java.sql.SQLException;
import java.util.Optional;
import java.util.UUID;
import static com.codahale.metrics.MetricRegistry.name;
import org.jdbi.v3.core.JdbiException;
import org.whispersystems.textsecuregcm.util.Constants;
public class Usernames {
@@ -34,7 +32,6 @@ public class Usernames {
public Usernames(FaultTolerantDatabase database) {
this.database = database;
this.database.getDatabase().registerRowMapper(new AccountRowMapper());
}
public boolean put(UUID uuid, String username) {

View File

@@ -1,36 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage.mappers;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Accounts;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.UUID;
public class AccountRowMapper implements RowMapper<Account> {
private static ObjectMapper mapper = SystemMapper.getMapper();
@Override
public Account map(ResultSet resultSet, StatementContext ctx) throws SQLException {
try {
Account account = mapper.readValue(resultSet.getString(Accounts.DATA), Account.class);
account.setNumber(resultSet.getString(Accounts.NUMBER));
account.setUuid(UUID.fromString(resultSet.getString(Accounts.UID)));
account.setVersion(resultSet.getInt(Accounts.VERSION));
return account;
} catch (IOException e) {
throw new SQLException(e);
}
}
}

View File

@@ -20,9 +20,6 @@ import io.lettuce.core.resource.ClientResources;
import io.micrometer.core.instrument.Metrics;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import org.jdbi.v3.core.Jdbi;
@@ -30,14 +27,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentialGenerator;
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
import org.whispersystems.textsecuregcm.metrics.PushLatencyManager;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.securebackup.SecureBackupClient;
import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient;
import org.whispersystems.textsecuregcm.sqs.DirectoryQueue;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Accounts;
import org.whispersystems.textsecuregcm.storage.AccountsDynamoDb;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.AccountsManager.DeletionReason;
@@ -49,9 +44,6 @@ import org.whispersystems.textsecuregcm.storage.KeysDynamoDb;
import org.whispersystems.textsecuregcm.storage.MessagesCache;
import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.MigrationDeletedAccounts;
import org.whispersystems.textsecuregcm.storage.MigrationMismatchedAccounts;
import org.whispersystems.textsecuregcm.storage.MigrationRetryAccounts;
import org.whispersystems.textsecuregcm.storage.Profiles;
import org.whispersystems.textsecuregcm.storage.ProfilesManager;
import org.whispersystems.textsecuregcm.storage.ReportMessageDynamoDb;
@@ -62,7 +54,6 @@ import org.whispersystems.textsecuregcm.storage.Usernames;
import org.whispersystems.textsecuregcm.storage.UsernamesManager;
import org.whispersystems.textsecuregcm.storage.VerificationCodeStore;
import org.whispersystems.textsecuregcm.util.DynamoDbFromConfig;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfiguration> {
@@ -70,11 +61,9 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
private final Logger logger = LoggerFactory.getLogger(DeleteUserCommand.class);
public DeleteUserCommand() {
super(new Application<WhisperServerConfiguration>() {
super(new Application<>() {
@Override
public void run(WhisperServerConfiguration configuration, Environment environment)
throws Exception
{
public void run(WhisperServerConfiguration configuration, Environment environment) {
}
}, "rmuser", "remove user");
@@ -105,9 +94,6 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
FaultTolerantDatabase accountDatabase = new FaultTolerantDatabase("account_database_delete_user", accountJdbi, configuration.getAccountsDatabaseConfiguration().getCircuitBreakerConfiguration());
ClientResources redisClusterClientResources = ClientResources.builder().build();
ThreadPoolExecutor accountsDynamoDbMigrationThreadPool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
new LinkedBlockingDeque<>());
DynamoDbClient reportMessagesDynamoDb = DynamoDbFromConfig.client(
configuration.getReportMessageDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
@@ -118,16 +104,9 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
DynamoDbClient accountsDynamoDbClient = DynamoDbFromConfig.client(
configuration.getAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbAsyncClient accountsDynamoDbAsyncClient = DynamoDbFromConfig.asyncClient(
configuration.getAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create(),
accountsDynamoDbMigrationThreadPool);
DynamoDbClient deletedAccountsDynamoDbClient = DynamoDbFromConfig.client(
configuration.getDeletedAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbClient migrationMismatchedAccountsDynamoDb = DynamoDbFromConfig.client(
configuration.getMigrationMismatchedAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
FaultTolerantRedisCluster cacheCluster = new FaultTolerantRedisCluster("main_cache_cluster",
configuration.getCacheClusterConfiguration(), redisClusterClientResources);
@@ -151,39 +130,41 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
configuration.getAppConfig().getConfigurationName());
dynamicConfigurationManager.start();
ExperimentEnrollmentManager experimentEnrollmentManager = new ExperimentEnrollmentManager(
dynamicConfigurationManager);
DynamoDbClient migrationDeletedAccountsDynamoDb = DynamoDbFromConfig.client(
configuration.getMigrationDeletedAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbClient migrationRetryAccountsDynamoDb = DynamoDbFromConfig.client(
configuration.getMigrationRetryAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbClient pendingAccountsDynamoDbClient = DynamoDbFromConfig.client(configuration.getPendingAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
AmazonDynamoDB deletedAccountsLockDynamoDbClient = AmazonDynamoDBClientBuilder.standard()
.withRegion(configuration.getDeletedAccountsLockDynamoDbConfiguration().getRegion())
.withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) configuration.getDeletedAccountsLockDynamoDbConfiguration().getClientExecutionTimeout().toMillis()))
.withRequestTimeout((int) configuration.getDeletedAccountsLockDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
.withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(
((int) configuration.getDeletedAccountsLockDynamoDbConfiguration().getClientExecutionTimeout()
.toMillis()))
.withRequestTimeout(
(int) configuration.getDeletedAccountsLockDynamoDbConfiguration().getClientRequestTimeout()
.toMillis()))
.withCredentials(InstanceProfileCredentialsProvider.getInstance())
.build();
DeletedAccounts deletedAccounts = new DeletedAccounts(deletedAccountsDynamoDbClient, configuration.getDeletedAccountsDynamoDbConfiguration().getTableName(), configuration.getDeletedAccountsDynamoDbConfiguration().getNeedsReconciliationIndexName());
MigrationDeletedAccounts migrationDeletedAccounts = new MigrationDeletedAccounts(migrationDeletedAccountsDynamoDb, configuration.getMigrationDeletedAccountsDynamoDbConfiguration().getTableName());
MigrationRetryAccounts migrationRetryAccounts = new MigrationRetryAccounts(migrationRetryAccountsDynamoDb, configuration.getMigrationRetryAccountsDynamoDbConfiguration().getTableName());
VerificationCodeStore pendingAccounts = new VerificationCodeStore(pendingAccountsDynamoDbClient, configuration.getPendingAccountsDynamoDbConfiguration().getTableName());
DeletedAccounts deletedAccounts = new DeletedAccounts(deletedAccountsDynamoDbClient,
configuration.getDeletedAccountsDynamoDbConfiguration().getTableName(),
configuration.getDeletedAccountsDynamoDbConfiguration().getNeedsReconciliationIndexName());
VerificationCodeStore pendingAccounts = new VerificationCodeStore(pendingAccountsDynamoDbClient,
configuration.getPendingAccountsDynamoDbConfiguration().getTableName());
Accounts accounts = new Accounts(accountDatabase);
AccountsDynamoDb accountsDynamoDb = new AccountsDynamoDb(accountsDynamoDbClient, accountsDynamoDbAsyncClient, accountsDynamoDbMigrationThreadPool, configuration.getAccountsDynamoDbConfiguration().getTableName(), configuration.getAccountsDynamoDbConfiguration().getPhoneNumberTableName(), migrationDeletedAccounts, migrationRetryAccounts);
Usernames usernames = new Usernames(accountDatabase);
Profiles profiles = new Profiles(accountDatabase);
ReservedUsernames reservedUsernames = new ReservedUsernames(accountDatabase);
KeysDynamoDb keysDynamoDb = new KeysDynamoDb(preKeysDynamoDb, configuration.getKeysDynamoDbConfiguration().getTableName());
MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(messageDynamoDb, configuration.getMessageDynamoDbConfiguration().getTableName(), configuration.getMessageDynamoDbConfiguration().getTimeToLive());
FaultTolerantRedisCluster messageInsertCacheCluster = new FaultTolerantRedisCluster("message_insert_cluster", configuration.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClusterClientResources);
FaultTolerantRedisCluster messageReadDeleteCluster = new FaultTolerantRedisCluster("message_read_delete_cluster", configuration.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClusterClientResources);
AccountsDynamoDb accountsDynamoDb = new AccountsDynamoDb(accountsDynamoDbClient,
configuration.getAccountsDynamoDbConfiguration().getTableName(),
configuration.getAccountsDynamoDbConfiguration().getPhoneNumberTableName());
Usernames usernames = new Usernames(accountDatabase);
Profiles profiles = new Profiles(accountDatabase);
ReservedUsernames reservedUsernames = new ReservedUsernames(accountDatabase);
KeysDynamoDb keysDynamoDb = new KeysDynamoDb(preKeysDynamoDb,
configuration.getKeysDynamoDbConfiguration().getTableName());
MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(messageDynamoDb,
configuration.getMessageDynamoDbConfiguration().getTableName(),
configuration.getMessageDynamoDbConfiguration().getTimeToLive());
FaultTolerantRedisCluster messageInsertCacheCluster = new FaultTolerantRedisCluster("message_insert_cluster",
configuration.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClusterClientResources);
FaultTolerantRedisCluster messageReadDeleteCluster = new FaultTolerantRedisCluster("message_read_delete_cluster",
configuration.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClusterClientResources);
FaultTolerantRedisCluster metricsCluster = new FaultTolerantRedisCluster("metrics_cluster",
configuration.getMetricsClusterConfiguration(), redisClusterClientResources);
SecureBackupClient secureBackupClient = new SecureBackupClient(backupCredentialsGenerator, backupServiceExecutor,
@@ -203,16 +184,13 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
Metrics.globalRegistry);
MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, pushLatencyManager,
reportMessageManager);
MigrationMismatchedAccounts mismatchedAccounts = new MigrationMismatchedAccounts(
migrationMismatchedAccountsDynamoDb,
configuration.getMigrationMismatchedAccountsDynamoDbConfiguration().getTableName());
DeletedAccountsManager deletedAccountsManager = new DeletedAccountsManager(deletedAccounts,
deletedAccountsLockDynamoDbClient,
configuration.getDeletedAccountsLockDynamoDbConfiguration().getTableName());
StoredVerificationCodeManager pendingAccountsManager = new StoredVerificationCodeManager(pendingAccounts);
AccountsManager accountsManager = new AccountsManager(accounts, accountsDynamoDb, cacheCluster,
deletedAccountsManager, directoryQueue, keysDynamoDb, messagesManager, mismatchedAccounts, usernamesManager,
profilesManager, pendingAccountsManager, secureStorageClient, secureBackupClient, experimentEnrollmentManager,
AccountsManager accountsManager = new AccountsManager(accountsDynamoDb, cacheCluster,
deletedAccountsManager, directoryQueue, keysDynamoDb, messagesManager, usernamesManager,
profilesManager, pendingAccountsManager, secureStorageClient, secureBackupClient,
dynamicConfigurationManager);
for (String user : users) {

View File

@@ -21,22 +21,17 @@ import io.micrometer.core.instrument.Metrics;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import org.jdbi.v3.core.Jdbi;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentialGenerator;
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
import org.whispersystems.textsecuregcm.metrics.PushLatencyManager;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.securebackup.SecureBackupClient;
import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient;
import org.whispersystems.textsecuregcm.sqs.DirectoryQueue;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Accounts;
import org.whispersystems.textsecuregcm.storage.AccountsDynamoDb;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.DeletedAccounts;
@@ -47,9 +42,6 @@ import org.whispersystems.textsecuregcm.storage.KeysDynamoDb;
import org.whispersystems.textsecuregcm.storage.MessagesCache;
import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.MigrationDeletedAccounts;
import org.whispersystems.textsecuregcm.storage.MigrationMismatchedAccounts;
import org.whispersystems.textsecuregcm.storage.MigrationRetryAccounts;
import org.whispersystems.textsecuregcm.storage.Profiles;
import org.whispersystems.textsecuregcm.storage.ProfilesManager;
import org.whispersystems.textsecuregcm.storage.ReportMessageDynamoDb;
@@ -60,7 +52,6 @@ import org.whispersystems.textsecuregcm.storage.Usernames;
import org.whispersystems.textsecuregcm.storage.UsernamesManager;
import org.whispersystems.textsecuregcm.storage.VerificationCodeStore;
import org.whispersystems.textsecuregcm.util.DynamoDbFromConfig;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
public class SetUserDiscoverabilityCommand extends EnvironmentCommand<WhisperServerConfiguration> {
@@ -105,9 +96,6 @@ public class SetUserDiscoverabilityCommand extends EnvironmentCommand<WhisperSer
configuration.getAccountsDatabaseConfiguration().getCircuitBreakerConfiguration());
ClientResources redisClusterClientResources = ClientResources.builder().build();
ThreadPoolExecutor accountsDynamoDbMigrationThreadPool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
new LinkedBlockingDeque<>());
DynamoDbClient reportMessagesDynamoDb = DynamoDbFromConfig
.client(configuration.getReportMessageDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
@@ -118,10 +106,6 @@ public class SetUserDiscoverabilityCommand extends EnvironmentCommand<WhisperSer
DynamoDbClient accountsDynamoDbClient = DynamoDbFromConfig
.client(configuration.getAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbAsyncClient accountsDynamoDbAsyncClient = DynamoDbFromConfig
.asyncClient(configuration.getAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create(),
accountsDynamoDbMigrationThreadPool);
DynamoDbClient deletedAccountsDynamoDbClient = DynamoDbFromConfig
.client(configuration.getDeletedAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
@@ -148,18 +132,6 @@ public class SetUserDiscoverabilityCommand extends EnvironmentCommand<WhisperSer
configuration.getAppConfig().getConfigurationName());
dynamicConfigurationManager.start();
ExperimentEnrollmentManager experimentEnrollmentManager = new ExperimentEnrollmentManager(
dynamicConfigurationManager);
DynamoDbClient migrationDeletedAccountsDynamoDb = DynamoDbFromConfig
.client(configuration.getMigrationDeletedAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbClient migrationMismatchedAccountsDynamoDb = DynamoDbFromConfig
.client(configuration.getMigrationMismatchedAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbClient migrationRetryAccountsDynamoDb = DynamoDbFromConfig
.client(configuration.getMigrationRetryAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
DynamoDbClient pendingAccountsDynamoDbClient = DynamoDbFromConfig
.client(configuration.getPendingAccountsDynamoDbConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
@@ -178,18 +150,13 @@ public class SetUserDiscoverabilityCommand extends EnvironmentCommand<WhisperSer
DeletedAccounts deletedAccounts = new DeletedAccounts(deletedAccountsDynamoDbClient,
configuration.getDeletedAccountsDynamoDbConfiguration().getTableName(),
configuration.getDeletedAccountsDynamoDbConfiguration().getNeedsReconciliationIndexName());
MigrationDeletedAccounts migrationDeletedAccounts = new MigrationDeletedAccounts(migrationDeletedAccountsDynamoDb,
configuration.getMigrationDeletedAccountsDynamoDbConfiguration().getTableName());
MigrationRetryAccounts migrationRetryAccounts = new MigrationRetryAccounts(migrationRetryAccountsDynamoDb,
configuration.getMigrationRetryAccountsDynamoDbConfiguration().getTableName());
VerificationCodeStore pendingAccounts = new VerificationCodeStore(pendingAccountsDynamoDbClient,
configuration.getPendingAccountsDynamoDbConfiguration().getTableName());
Accounts accounts = new Accounts(accountDatabase);
AccountsDynamoDb accountsDynamoDb = new AccountsDynamoDb(accountsDynamoDbClient, accountsDynamoDbAsyncClient,
accountsDynamoDbMigrationThreadPool, configuration.getAccountsDynamoDbConfiguration().getTableName(),
configuration.getAccountsDynamoDbConfiguration().getPhoneNumberTableName(), migrationDeletedAccounts,
migrationRetryAccounts);
AccountsDynamoDb accountsDynamoDb = new AccountsDynamoDb(accountsDynamoDbClient,
configuration.getAccountsDynamoDbConfiguration().getTableName(),
configuration.getAccountsDynamoDbConfiguration().getPhoneNumberTableName()
);
Usernames usernames = new Usernames(accountDatabase);
Profiles profiles = new Profiles(accountDatabase);
ReservedUsernames reservedUsernames = new ReservedUsernames(accountDatabase);
@@ -221,17 +188,14 @@ public class SetUserDiscoverabilityCommand extends EnvironmentCommand<WhisperSer
Metrics.globalRegistry);
MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, pushLatencyManager,
reportMessageManager);
MigrationMismatchedAccounts mismatchedAccounts = new MigrationMismatchedAccounts(
migrationMismatchedAccountsDynamoDb,
configuration.getMigrationMismatchedAccountsDynamoDbConfiguration().getTableName());
DeletedAccountsManager deletedAccountsManager = new DeletedAccountsManager(deletedAccounts,
deletedAccountsLockDynamoDbClient,
configuration.getDeletedAccountsLockDynamoDbConfiguration().getTableName());
StoredVerificationCodeManager pendingAccountsManager = new StoredVerificationCodeManager(pendingAccounts);
AccountsManager accountsManager = new AccountsManager(accounts, accountsDynamoDb, cacheCluster,
deletedAccountsManager, directoryQueue, keysDynamoDb, messagesManager, mismatchedAccounts, usernamesManager,
AccountsManager accountsManager = new AccountsManager(accountsDynamoDb, cacheCluster,
deletedAccountsManager, directoryQueue, keysDynamoDb, messagesManager, usernamesManager,
profilesManager,
pendingAccountsManager, secureStorageClient, secureBackupClient, experimentEnrollmentManager,
pendingAccountsManager, secureStorageClient, secureBackupClient,
dynamicConfigurationManager);
Optional<Account> maybeAccount;

View File

@@ -1,47 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.workers;
import net.sourceforge.argparse4j.inf.Namespace;
import org.jdbi.v3.core.Jdbi;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.configuration.DatabaseConfiguration;
import org.whispersystems.textsecuregcm.storage.Accounts;
import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
import io.dropwizard.cli.ConfiguredCommand;
import io.dropwizard.setup.Bootstrap;
public class VacuumCommand extends ConfiguredCommand<WhisperServerConfiguration> {
private final Logger logger = LoggerFactory.getLogger(VacuumCommand.class);
public VacuumCommand() {
super("vacuum", "Vacuum Postgres Tables");
}
@Override
protected void run(Bootstrap<WhisperServerConfiguration> bootstrap,
Namespace namespace,
WhisperServerConfiguration config)
throws Exception
{
DatabaseConfiguration accountDbConfig = config.getAbuseDatabaseConfiguration();
Jdbi accountJdbi = Jdbi.create(accountDbConfig.getUrl(), accountDbConfig.getUser(), accountDbConfig.getPassword());
FaultTolerantDatabase accountDatabase = new FaultTolerantDatabase("account_database_vacuum", accountJdbi, accountDbConfig.getCircuitBreakerConfiguration());
Accounts accounts = new Accounts(accountDatabase);
logger.info("Vacuuming accounts...");
accounts.vacuum();
Thread.sleep(3000);
System.exit(0);
}
}