Add Accounts DynamoDB

* Add additional test cases to AccountsTest
* Migrate AccountsManagerTest to JUnit 5
* Add AccountsDynamoDbConfiguration
* Add Account.dynamoDbMigrationversion
* Add DynamicAccountsDynamoDbMigrationConfiguration
* Add AccountsDynamoDb to AccountsManager
* Add AccountsDynamoDbMigrator
This commit is contained in:
Chris Eager
2021-04-16 14:24:24 -05:00
committed by GitHub
parent f6c9b2b6e7
commit 59bbd0c43c
18 changed files with 1446 additions and 87 deletions

View File

@@ -7,7 +7,15 @@ package org.whispersystems.textsecuregcm;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.dropwizard.Configuration;
import io.dropwizard.client.JerseyClientConfiguration;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import org.whispersystems.textsecuregcm.configuration.AccountDatabaseCrawlerConfiguration;
import org.whispersystems.textsecuregcm.configuration.AccountsDatabaseConfiguration;
import org.whispersystems.textsecuregcm.configuration.AccountsDynamoDbConfiguration;
import org.whispersystems.textsecuregcm.configuration.ApnConfiguration;
import org.whispersystems.textsecuregcm.configuration.AppConfigConfiguration;
import org.whispersystems.textsecuregcm.configuration.AwsAttachmentsConfiguration;
@@ -17,7 +25,6 @@ import org.whispersystems.textsecuregcm.configuration.DirectoryConfiguration;
import org.whispersystems.textsecuregcm.configuration.DynamoDbConfiguration;
import org.whispersystems.textsecuregcm.configuration.GcmConfiguration;
import org.whispersystems.textsecuregcm.configuration.GcpAttachmentsConfiguration;
import org.whispersystems.textsecuregcm.configuration.AccountsDatabaseConfiguration;
import org.whispersystems.textsecuregcm.configuration.MaxDeviceConfiguration;
import org.whispersystems.textsecuregcm.configuration.MessageCacheConfiguration;
import org.whispersystems.textsecuregcm.configuration.MessageDynamoDbConfiguration;
@@ -39,13 +46,6 @@ import org.whispersystems.textsecuregcm.configuration.VoiceVerificationConfigura
import org.whispersystems.textsecuregcm.configuration.ZkConfig;
import org.whispersystems.websocket.configuration.WebSocketConfiguration;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/** @noinspection MismatchedQueryAndUpdateOfCollection, WeakerAccess */
public class WhisperServerConfiguration extends Configuration {
@@ -129,6 +129,11 @@ public class WhisperServerConfiguration extends Configuration {
@JsonProperty
private DynamoDbConfiguration keysDynamoDb;
@Valid
@NotNull
@JsonProperty
private AccountsDynamoDbConfiguration accountsDynamoDb;
@Valid
@NotNull
@JsonProperty
@@ -302,6 +307,10 @@ public class WhisperServerConfiguration extends Configuration {
return keysDynamoDb;
}
public AccountsDynamoDbConfiguration getAccountsDynamoDbConfiguration() {
return accountsDynamoDb;
}
public DatabaseConfiguration getAbuseDatabaseConfiguration() {
return abuseDatabase;
}

View File

@@ -12,6 +12,7 @@ import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.s3.AmazonS3;
@@ -141,6 +142,8 @@ import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawler;
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerCache;
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerListener;
import org.whispersystems.textsecuregcm.storage.Accounts;
import org.whispersystems.textsecuregcm.storage.AccountsDynamoDb;
import org.whispersystems.textsecuregcm.storage.AccountsDynamoDbMigrator;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.ActiveUserCounter;
import org.whispersystems.textsecuregcm.storage.DirectoryReconciler;
@@ -276,10 +279,20 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
.withRequestTimeout((int) config.getKeysDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
.withCredentials(InstanceProfileCredentialsProvider.getInstance());
AmazonDynamoDBClientBuilder accountsDynamoDbClientBuilder = AmazonDynamoDBClientBuilder
.standard()
.withRegion(config.getAccountsDynamoDbConfiguration().getRegion())
.withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) config.getAccountsDynamoDbConfiguration().getClientExecutionTimeout().toMillis()))
.withRequestTimeout((int) config.getAccountsDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
.withCredentials(InstanceProfileCredentialsProvider.getInstance());
DynamoDB messageDynamoDb = new DynamoDB(messageDynamoDbClientBuilder.build());
DynamoDB preKeyDynamoDb = new DynamoDB(keysDynamoDbClientBuilder.build());
AmazonDynamoDB accountsDynamoDbClient = accountsDynamoDbClientBuilder.build();
Accounts accounts = new Accounts(accountDatabase);
AccountsDynamoDb accountsDynamoDb = new AccountsDynamoDb(accountsDynamoDbClient, new DynamoDB(accountsDynamoDbClient), config.getAccountsDynamoDbConfiguration().getTableName(), config.getAccountsDynamoDbConfiguration().getPhoneNumberTableName());
PendingAccounts pendingAccounts = new PendingAccounts(accountDatabase);
PendingDevices pendingDevices = new PendingDevices (accountDatabase);
Usernames usernames = new Usernames(accountDatabase);
@@ -346,7 +359,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
MessagesCache messagesCache = new MessagesCache(messagesCluster, messagesCluster, keyspaceNotificationDispatchExecutor);
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster);
MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, pushLatencyManager);
AccountsManager accountsManager = new AccountsManager(accounts, cacheCluster, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager, secureStorageClient, secureBackupClient);
AccountsManager accountsManager = new AccountsManager(accounts, accountsDynamoDb, cacheCluster, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager, secureStorageClient, secureBackupClient, experimentEnrollmentManager, dynamicConfigurationManager);
RemoteConfigsManager remoteConfigsManager = new RemoteConfigsManager(remoteConfigs);
DeadLetterHandler deadLetterHandler = new DeadLetterHandler(accountsManager, messagesManager);
DispatchManager dispatchManager = new DispatchManager(pubSubClientFactory, Optional.of(deadLetterHandler));
@@ -379,6 +392,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
}
accountDatabaseCrawlerListeners.add(new AccountCleaner(accountsManager));
accountDatabaseCrawlerListeners.add(new RegistrationLockVersionCounter(metricsCluster, config.getMetricsFactory()));
accountDatabaseCrawlerListeners.add(new AccountsDynamoDbMigrator(accountsDynamoDb, dynamicConfigurationManager));
HttpClient currencyClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_2).connectTimeout(Duration.ofSeconds(10)).build();
FixerClient fixerClient = new FixerClient(currencyClient, config.getPaymentsServiceConfiguration().getFixerApiKey());

View File

@@ -0,0 +1,13 @@
package org.whispersystems.textsecuregcm.configuration;
import javax.validation.constraints.NotNull;
public class AccountsDynamoDbConfiguration extends DynamoDbConfiguration {
@NotNull
private String phoneNumberTableName;
public String getPhoneNumberTableName() {
return phoneNumberTableName;
}
}

View File

@@ -0,0 +1,48 @@
package org.whispersystems.textsecuregcm.configuration.dynamic;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
public class DynamicAccountsDynamoDbMigrationConfiguration {
@JsonProperty
boolean backgroundMigrationEnabled;
@JsonProperty
boolean deleteEnabled;
@JsonProperty
boolean writeEnabled;
@JsonProperty
boolean readEnabled;
public boolean isBackgroundMigrationEnabled() {
return backgroundMigrationEnabled;
}
public void setDeleteEnabled(boolean deleteEnabled) {
this.deleteEnabled = deleteEnabled;
}
public boolean isDeleteEnabled() {
return deleteEnabled;
}
public void setWriteEnabled(boolean writeEnabled) {
this.writeEnabled = writeEnabled;
}
public boolean isWriteEnabled() {
return writeEnabled;
}
@VisibleForTesting
public void setReadEnabled(boolean readEnabled) {
this.readEnabled = readEnabled;
}
public boolean isReadEnabled() {
return readEnabled;
}
}

View File

@@ -44,6 +44,9 @@ public class DynamicConfiguration {
@JsonProperty
private DynamicSignupCaptchaConfiguration signupCaptcha = new DynamicSignupCaptchaConfiguration();
@JsonProperty
private DynamicAccountsDynamoDbMigrationConfiguration accountsDynamoDbMigration = new DynamicAccountsDynamoDbMigrationConfiguration();
public Optional<DynamicExperimentEnrollmentConfiguration> getExperimentEnrollmentConfiguration(
final String experimentName) {
return Optional.ofNullable(experiments.get(experimentName));
@@ -86,4 +89,8 @@ public class DynamicConfiguration {
public DynamicSignupCaptchaConfiguration getSignupCaptchaConfiguration() {
return signupCaptcha;
}
public DynamicAccountsDynamoDbMigrationConfiguration getAccountsDynamoDbMigrationConfiguration() {
return accountsDynamoDbMigration;
}
}

View File

@@ -58,6 +58,9 @@ public class Account implements Principal {
@JsonProperty("inCds")
private boolean discoverableByPhoneNumber = true;
@JsonProperty("_ddbV")
private int dynamoDbMigrationVersion;
@JsonIgnore
private Device authenticatedDevice;
@@ -265,6 +268,14 @@ public class Account implements Principal {
this.discoverableByPhoneNumber = discoverableByPhoneNumber;
}
public int getDynamoDbMigrationVersion() {
return dynamoDbMigrationVersion;
}
public void setDynamoDbMigrationVersion(int dynamoDbMigrationVersion) {
this.dynamoDbMigrationVersion = dynamoDbMigrationVersion;
}
// Principal implementation
@Override

View File

@@ -0,0 +1,17 @@
package org.whispersystems.textsecuregcm.storage;
import java.util.Optional;
import java.util.UUID;
public interface AccountStore {
boolean create(Account account);
void update(Account account);
Optional<Account> get(String number);
Optional<Account> get(UUID uuid);
void delete(final UUID uuid);
}

View File

@@ -4,23 +4,22 @@
*/
package org.whispersystems.textsecuregcm.storage;
import static com.codahale.metrics.MetricRegistry.name;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import org.jdbi.v3.core.transaction.TransactionIsolationLevel;
import org.whispersystems.textsecuregcm.storage.mappers.AccountRowMapper;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import static com.codahale.metrics.MetricRegistry.name;
public class Accounts {
public class Accounts implements AccountStore {
public static final String ID = "id";
public static final String UID = "uuid";
@@ -46,6 +45,7 @@ public class Accounts {
this.database.getDatabase().registerRowMapper(new AccountRowMapper());
}
@Override
public boolean create(Account account) {
return database.with(jdbi -> jdbi.inTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> {
try (Timer.Context ignored = createTimer.time()) {
@@ -65,6 +65,7 @@ public class Accounts {
}));
}
@Override
public void update(Account account) {
database.use(jdbi -> jdbi.useHandle(handle -> {
try (Timer.Context ignored = updateTimer.time()) {
@@ -78,6 +79,7 @@ public class Accounts {
}));
}
@Override
public Optional<Account> get(String number) {
return database.with(jdbi -> jdbi.withHandle(handle -> {
try (Timer.Context ignored = getByNumberTimer.time()) {
@@ -89,6 +91,7 @@ public class Accounts {
}));
}
@Override
public Optional<Account> get(UUID uuid) {
return database.with(jdbi -> jdbi.withHandle(handle -> {
try (Timer.Context ignored = getByUuidTimer.time()) {
@@ -123,6 +126,7 @@ public class Accounts {
}));
}
@Override
public void delete(final UUID uuid) {
database.use(jdbi -> jdbi.useHandle(handle -> {
try (Timer.Context ignored = deleteTimer.time()) {

View File

@@ -0,0 +1,262 @@
package org.whispersystems.textsecuregcm.storage;
import static com.codahale.metrics.MetricRegistry.name;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.document.AttributeUpdate;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Item;
import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.dynamodbv2.document.spec.GetItemSpec;
import com.amazonaws.services.dynamodbv2.document.spec.UpdateItemSpec;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.CancellationReason;
import com.amazonaws.services.dynamodbv2.model.Delete;
import com.amazonaws.services.dynamodbv2.model.GetItemResult;
import com.amazonaws.services.dynamodbv2.model.Put;
import com.amazonaws.services.dynamodbv2.model.ReturnValuesOnConditionCheckFailure;
import com.amazonaws.services.dynamodbv2.model.TransactWriteItem;
import com.amazonaws.services.dynamodbv2.model.TransactWriteItemsRequest;
import com.amazonaws.services.dynamodbv2.model.TransactionCanceledException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.textsecuregcm.util.UUIDUtil;
public class AccountsDynamoDb extends AbstractDynamoDbStore implements AccountStore {
// uuid, primary key
static final String KEY_ACCOUNT_UUID = "U";
// phone number
static final String ATTR_ACCOUNT_E164 = "P";
// account, serialized to JSON
static final String ATTR_ACCOUNT_DATA = "D";
static final String ATTR_MIGRATION_VERSION = "V";
private final AmazonDynamoDB client;
private final Table accountsTable;
private final String phoneNumbersTableName;
private static final Timer CREATE_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "create"));
private static final Timer UPDATE_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "update"));
private static final Timer GET_BY_NUMBER_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "getByNumber"));
private static final Timer GET_BY_UUID_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "getByUuid"));
private static final Timer DELETE_TIMER = Metrics.timer(name(AccountsDynamoDb.class, "delete"));
public AccountsDynamoDb(AmazonDynamoDB client, DynamoDB dynamoDb, String accountsTableName, String phoneNumbersTableName) {
super(dynamoDb);
this.client = client;
this.accountsTable = dynamoDb.getTable(accountsTableName);
this.phoneNumbersTableName = phoneNumbersTableName;
}
@Override
public boolean create(Account account) {
return CREATE_TIMER.record(() -> {
try {
TransactWriteItem phoneNumberConstraintPut = buildPutWriteItemForPhoneNumberConstraint(account, account.getUuid());
TransactWriteItem accountPut = buildPutWriteItemForAccount(account, account.getUuid());
final TransactWriteItemsRequest request = new TransactWriteItemsRequest()
.withTransactItems(phoneNumberConstraintPut, accountPut);
try {
client.transactWriteItems(request);
} catch (TransactionCanceledException e) {
final CancellationReason accountCancellationReason = e.getCancellationReasons().get(1);
if ("ConditionalCheckFailed".equals(accountCancellationReason.getCode())) {
throw new IllegalArgumentException("uuid present with different phone number");
}
final CancellationReason phoneNumberConstraintCancellationReason = e.getCancellationReasons().get(0);
if ("ConditionalCheckFailed".equals(phoneNumberConstraintCancellationReason.getCode())) {
ByteBuffer actualAccountUuid = phoneNumberConstraintCancellationReason.getItem().get(KEY_ACCOUNT_UUID).getB();
account.setUuid(UUIDUtil.fromByteBuffer(actualAccountUuid));
update(account);
return false;
}
// this shouldnt happen
throw new RuntimeException("could not create account");
}
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(e);
}
return true;
});
}
private TransactWriteItem buildPutWriteItemForAccount(Account account, UUID uuid) throws JsonProcessingException {
return new TransactWriteItem()
.withPut(
new Put()
.withTableName(accountsTable.getTableName())
.withItem(Map.of(
KEY_ACCOUNT_UUID, new AttributeValue().withB(UUIDUtil.toByteBuffer(uuid)),
ATTR_ACCOUNT_E164, new AttributeValue(account.getNumber()),
ATTR_ACCOUNT_DATA, new AttributeValue()
.withB(ByteBuffer.wrap(SystemMapper.getMapper().writeValueAsBytes(account))),
ATTR_MIGRATION_VERSION, new AttributeValue().withN(
String.valueOf(account.getDynamoDbMigrationVersion()))))
.withConditionExpression("attribute_not_exists(#number) OR #number = :number")
.withExpressionAttributeNames(Map.of("#number", ATTR_ACCOUNT_E164))
.withExpressionAttributeValues(Map.of(":number", new AttributeValue(account.getNumber()))));
}
private TransactWriteItem buildPutWriteItemForPhoneNumberConstraint(Account account, UUID uuid) {
return new TransactWriteItem()
.withPut(
new Put()
.withTableName(phoneNumbersTableName)
.withItem(Map.of(
ATTR_ACCOUNT_E164, new AttributeValue(account.getNumber()),
KEY_ACCOUNT_UUID, new AttributeValue().withB(UUIDUtil.toByteBuffer(uuid))))
.withConditionExpression(
"attribute_not_exists(#number) OR (attribute_exists(#number) AND #uuid = :uuid)")
.withExpressionAttributeNames(
Map.of("#uuid", KEY_ACCOUNT_UUID,
"#number", ATTR_ACCOUNT_E164))
.withExpressionAttributeValues(
Map.of(":uuid", new AttributeValue().withB(UUIDUtil.toByteBuffer(uuid))))
.withReturnValuesOnConditionCheckFailure(ReturnValuesOnConditionCheckFailure.ALL_OLD));
}
@Override
public void update(Account account) {
UPDATE_TIMER.record(() -> {
UpdateItemSpec updateItemSpec;
try {
updateItemSpec = new UpdateItemSpec()
.withPrimaryKey(
new PrimaryKey(KEY_ACCOUNT_UUID, UUIDUtil.toByteBuffer(account.getUuid())))
.withAttributeUpdate(
new AttributeUpdate(ATTR_ACCOUNT_DATA).put(SystemMapper.getMapper().writeValueAsBytes(account)),
new AttributeUpdate(ATTR_MIGRATION_VERSION).put(String.valueOf(account.getDynamoDbMigrationVersion())));
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(e);
}
accountsTable.updateItem(updateItemSpec);
});
}
@Override
public Optional<Account> get(String number) {
return GET_BY_NUMBER_TIMER.record(() -> {
final GetItemResult phoneNumberAndUuid = client.getItem(phoneNumbersTableName,
Map.of(ATTR_ACCOUNT_E164, new AttributeValue(number)), true);
return Optional.ofNullable(phoneNumberAndUuid.getItem())
.map(item -> item.get(KEY_ACCOUNT_UUID).getB())
.map(uuid -> accountsTable.getItem(new GetItemSpec()
.withPrimaryKey(KEY_ACCOUNT_UUID, uuid.array())
.withConsistentRead(true)))
.map(AccountsDynamoDb::fromItem);
});
}
@Override
public Optional<Account> get(UUID uuid) {
Optional<Item> maybeItem = GET_BY_UUID_TIMER.record(() ->
Optional.ofNullable(accountsTable.getItem(new GetItemSpec().
withPrimaryKey(new PrimaryKey(KEY_ACCOUNT_UUID, UUIDUtil.toByteBuffer(uuid)))
.withConsistentRead(true))));
return maybeItem.map(AccountsDynamoDb::fromItem);
}
@Override
public void delete(UUID uuid) {
DELETE_TIMER.record(() -> {
Optional<Account> maybeAccount = get(uuid);
maybeAccount.ifPresent(account -> {
TransactWriteItem phoneNumberDelete = new TransactWriteItem()
.withDelete(new Delete()
.withTableName(phoneNumbersTableName)
.withKey(Map.of(ATTR_ACCOUNT_E164, new AttributeValue(account.getNumber()))));
TransactWriteItem accountDelete = new TransactWriteItem().withDelete(
new Delete()
.withTableName(accountsTable.getTableName())
.withKey(Map.of(KEY_ACCOUNT_UUID, new AttributeValue().withB(UUIDUtil.toByteBuffer(uuid)))));
TransactWriteItemsRequest request = new TransactWriteItemsRequest()
.withTransactItems(phoneNumberDelete, accountDelete);
client.transactWriteItems(request);
});
});
}
public boolean migrate(Account account) {
try {
TransactWriteItem phoneNumberConstraintPut = buildPutWriteItemForPhoneNumberConstraint(account, account.getUuid());
TransactWriteItem accountPut = buildPutWriteItemForAccount(account, account.getUuid());
accountPut.getPut()
.setConditionExpression("attribute_not_exists(#uuid) OR (attribute_exists(#uuid) AND #version < :version)");
accountPut.getPut()
.setExpressionAttributeNames(Map.of("#uuid", KEY_ACCOUNT_UUID,
"#version", ATTR_MIGRATION_VERSION));
accountPut.getPut()
.setExpressionAttributeValues(
Map.of(":version", new AttributeValue().withN(String.valueOf(account.getDynamoDbMigrationVersion()))));
final TransactWriteItemsRequest request = new TransactWriteItemsRequest()
.withTransactItems(phoneNumberConstraintPut, accountPut);
client.transactWriteItems(request);
return true;
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(e);
} catch (TransactionCanceledException ignored) {
// account is already migrated
}
return false;
}
@VisibleForTesting
static Account fromItem(Item item) {
try {
Account account = SystemMapper.getMapper().readValue(item.getBinary(ATTR_ACCOUNT_DATA), Account.class);
account.setNumber(item.getString(ATTR_ACCOUNT_E164));
account.setUuid(UUIDUtil.fromByteBuffer(item.getByteBuffer(KEY_ACCOUNT_UUID)));
return account;
} catch (IOException e) {
throw new RuntimeException("Could not read stored account data", e);
}
}
}

View File

@@ -0,0 +1,53 @@
package org.whispersystems.textsecuregcm.storage;
import static com.codahale.metrics.MetricRegistry.name;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
public class AccountsDynamoDbMigrator extends AccountDatabaseCrawlerListener {
private static final Counter MIGRATED_COUNTER = Metrics.counter(name(AccountsDynamoDbMigrator.class, "migrated"));
private static final Counter ERROR_COUNTER = Metrics.counter(name(AccountsDynamoDbMigrator.class, "error"));
private final AccountsDynamoDb accountsDynamoDb;
private final DynamicConfigurationManager dynamicConfigurationManager;
public AccountsDynamoDbMigrator(final AccountsDynamoDb accountsDynamoDb, final DynamicConfigurationManager dynamicConfigurationManager) {
this.accountsDynamoDb = accountsDynamoDb;
this.dynamicConfigurationManager = dynamicConfigurationManager;
}
@Override
public void onCrawlStart() {
}
@Override
public void onCrawlEnd(Optional<UUID> fromUuid) {
}
@Override
protected void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) {
if (!dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isBackgroundMigrationEnabled()) {
return;
}
for (Account account : chunkAccounts) {
try {
final boolean migrated = accountsDynamoDb.migrate(account);
if (migrated) {
MIGRATED_COUNTER.increment();
}
} catch (final Exception e) {
ERROR_COUNTER.increment();
}
}
}
}

View File

@@ -14,15 +14,20 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.lettuce.core.RedisException;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.AmbiguousIdentifier;
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.securebackup.SecureBackupClient;
import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient;
@@ -50,10 +55,14 @@ public class AccountsManager {
private static final String COUNTRY_CODE_TAG_NAME = "country";
private static final String DELETION_REASON_TAG_NAME = "reason";
private static final String DYNAMO_MIGRATION_ERROR_COUNTER = name(AccountsManager.class, "migration", "error");
private static final Counter DYNAMO_MIGRATION_COMPARISON_COUNTER = Metrics.counter(name(AccountsManager.class, "migration", "comparisons"));
private static final Counter DYNAMO_MIGRATION_MISMATCH_COUNTER = Metrics.counter(name(AccountsManager.class, "migration", "mismatches"));
private final Logger logger = LoggerFactory.getLogger(AccountsManager.class);
private final Accounts accounts;
private final AccountsDynamoDb accountsDynamoDb;
private final FaultTolerantRedisCluster cacheCluster;
private final DirectoryQueue directoryQueue;
private final KeysDynamoDb keysDynamoDb;
@@ -64,6 +73,9 @@ public class AccountsManager {
private final SecureBackupClient secureBackupClient;
private final ObjectMapper mapper;
private final DynamicConfigurationManager dynamicConfigurationManager;
private final ExperimentEnrollmentManager experimentEnrollmentManager;
public enum DeletionReason {
ADMIN_DELETED("admin"),
EXPIRED ("expired"),
@@ -76,11 +88,13 @@ public class AccountsManager {
}
}
public AccountsManager(Accounts accounts, FaultTolerantRedisCluster cacheCluster, final DirectoryQueue directoryQueue,
public AccountsManager(Accounts accounts, AccountsDynamoDb accountsDynamoDb, FaultTolerantRedisCluster cacheCluster, final DirectoryQueue directoryQueue,
final KeysDynamoDb keysDynamoDb, final MessagesManager messagesManager, final UsernamesManager usernamesManager,
final ProfilesManager profilesManager, final SecureStorageClient secureStorageClient,
final SecureBackupClient secureBackupClient) {
final SecureBackupClient secureBackupClient,
final ExperimentEnrollmentManager experimentEnrollmentManager, final DynamicConfigurationManager dynamicConfigurationManager) {
this.accounts = accounts;
this.accountsDynamoDb = accountsDynamoDb;
this.cacheCluster = cacheCluster;
this.directoryQueue = directoryQueue;
this.keysDynamoDb = keysDynamoDb;
@@ -90,6 +104,9 @@ public class AccountsManager {
this.secureStorageClient = secureStorageClient;
this.secureBackupClient = secureBackupClient;
this.mapper = SystemMapper.getMapper();
this.dynamicConfigurationManager = dynamicConfigurationManager;
this.experimentEnrollmentManager = experimentEnrollmentManager;
}
public boolean create(Account account) {
@@ -97,14 +114,26 @@ public class AccountsManager {
boolean freshUser = databaseCreate(account);
redisSet(account);
if (dynamoWriteEnabled()) {
runSafelyAndRecordMetrics(() -> dynamoCreate(account), Optional.of(account.getUuid()), freshUser,
Boolean::compareTo, "create");
}
return freshUser;
}
}
public void update(Account account) {
try (Timer.Context ignored = updateTimer.time()) {
account.setDynamoDbMigrationVersion(account.getDynamoDbMigrationVersion() + 1);
redisSet(account);
databaseUpdate(account);
if (dynamoWriteEnabled()) {
runSafelyAndRecordMetrics(() -> {
dynamoUpdate(account);
return true;
}, Optional.of(account.getUuid()), true, Boolean::compareTo, "update");
}
}
}
@@ -121,6 +150,11 @@ public class AccountsManager {
if (!account.isPresent()) {
account = databaseGet(number);
account.ifPresent(value -> redisSet(value));
if (dynamoReadEnabled()) {
runSafelyAndRecordMetrics(() -> dynamoGet(number), Optional.empty(), account, this::compareAccounts,
"getByNumber");
}
}
return account;
@@ -134,6 +168,11 @@ public class AccountsManager {
if (!account.isPresent()) {
account = databaseGet(uuid);
account.ifPresent(value -> redisSet(value));
if (dynamoReadEnabled()) {
runSafelyAndRecordMetrics(() -> dynamoGet(uuid), Optional.of(uuid), account, this::compareAccounts,
"getByUuid");
}
}
return account;
@@ -165,6 +204,16 @@ public class AccountsManager {
redisDelete(account);
databaseDelete(account);
if (dynamoDeleteEnabled()) {
try {
dynamoDelete(account);
} catch (final Exception e) {
logger.error("Could not delete account {} from dynamo", account.getUuid().toString());
Metrics.counter(DYNAMO_MIGRATION_ERROR_COUNTER, "action", "delete");
}
}
} catch (final Exception e) {
logger.warn("Failed to delete account", e);
@@ -265,4 +314,107 @@ public class AccountsManager {
private void databaseDelete(final Account account) {
accounts.delete(account.getUuid());
}
private Optional<Account> dynamoGet(String number) {
return accountsDynamoDb.get(number);
}
private Optional<Account> dynamoGet(UUID uuid) {
return accountsDynamoDb.get(uuid);
}
private boolean dynamoCreate(Account account) {
return accountsDynamoDb.create(account);
}
private void dynamoUpdate(Account account) {
accountsDynamoDb.update(account);
}
private void dynamoDelete(final Account account) {
accountsDynamoDb.delete(account.getUuid());
}
private boolean dynamoDeleteEnabled() {
return dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isDeleteEnabled();
}
private boolean dynamoReadEnabled() {
return dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isReadEnabled();
}
private boolean dynamoWriteEnabled() {
return dynamoDeleteEnabled()
&& dynamicConfigurationManager.getConfiguration().getAccountsDynamoDbMigrationConfiguration().isWriteEnabled();
}
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public int compareAccounts(final Optional<Account> maybeDatabaseAccount, final Optional<Account> maybeDynamoAccount) {
if (maybeDatabaseAccount.isEmpty() && maybeDynamoAccount.isEmpty()) {
return 0;
}
if (maybeDatabaseAccount.isEmpty() || maybeDynamoAccount.isEmpty()) {
return 1;
}
final Account databaseAccount = maybeDatabaseAccount.get();
final Account dynamoAccount = maybeDynamoAccount.get();
final int uuidCompare = databaseAccount.getUuid().compareTo(dynamoAccount.getUuid());
if (uuidCompare != 0) {
return uuidCompare;
}
final int numberCompare = databaseAccount.getNumber().compareTo(dynamoAccount.getNumber());
if (numberCompare != 0) {
return numberCompare;
}
try {
final byte[] databaseSerialized = mapper.writeValueAsBytes(databaseAccount);
final byte[] dynamoSerialized = mapper.writeValueAsBytes(dynamoAccount);
return Arrays.compare(databaseSerialized, dynamoSerialized);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private <T> void runSafelyAndRecordMetrics(Callable<T> callable, Optional<UUID> maybeUuid, final T databaseResult, final Comparator<T> comparator, final String action) {
if (maybeUuid.isPresent()) {
// the only time we dont have a UUID is in getByNumber, which is sufficiently low volume to not be a concern, and
// it will also be gated by the global readEnabled configuration
final boolean enrolled = experimentEnrollmentManager.isEnrolled(maybeUuid.get(), "accountsDynamoDbMigration");
if (!enrolled) {
return;
}
}
try {
final T dynamoResult = callable.call();
compare(databaseResult, dynamoResult, comparator);
} catch (final Exception e) {
logger.error("Error running " + action + " ih Dynamo", e);
Metrics.counter(DYNAMO_MIGRATION_ERROR_COUNTER, "action", action).increment();
}
}
private <T> void compare(final T databaseResult, final T dynamoResult, final Comparator<T> comparator) {
DYNAMO_MIGRATION_COMPARISON_COUNTER.increment();
if (comparator.compare(databaseResult, dynamoResult) != 0) {
DYNAMO_MIGRATION_MISMATCH_COUNTER.increment();
}
}
}

View File

@@ -11,20 +11,27 @@ import java.util.UUID;
public class UUIDUtil {
public static byte[] toBytes(final UUID uuid) {
final ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]);
byteBuffer.putLong(uuid.getMostSignificantBits());
byteBuffer.putLong(uuid.getLeastSignificantBits());
return byteBuffer.array();
return toByteBuffer(uuid).array();
}
public static ByteBuffer toByteBuffer(final UUID uuid) {
final ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]);
byteBuffer.putLong(uuid.getMostSignificantBits());
byteBuffer.putLong(uuid.getLeastSignificantBits());
return byteBuffer.flip();
}
public static UUID fromBytes(final byte[] bytes) {
if (bytes.length != 16) {
throw new IllegalArgumentException("unexpected byte array length; was " + bytes.length + " but expected 16");
}
final ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
final long mostSigBits = byteBuffer.getLong();
final long leastSigBits = byteBuffer.getLong();
return new UUID(mostSigBits, leastSigBits);
return fromByteBuffer(ByteBuffer.wrap(bytes));
}
public static UUID fromByteBuffer(final ByteBuffer byteBuffer) {
if (byteBuffer.array().length != 16) {
throw new IllegalArgumentException("unexpected byte array length; was " + byteBuffer.array().length + " but expected 16");
}
final long mostSigBits = byteBuffer.getLong();
final long leastSigBits = byteBuffer.getLong();
return new UUID(mostSigBits, leastSigBits);
}
}

View File

@@ -9,6 +9,7 @@ import static com.codahale.metrics.MetricRegistry.name;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.fasterxml.jackson.databind.DeserializationFeature;
@@ -26,6 +27,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentialGenerator;
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
import org.whispersystems.textsecuregcm.metrics.PushLatencyManager;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.securebackup.SecureBackupClient;
@@ -33,7 +35,9 @@ import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient;
import org.whispersystems.textsecuregcm.sqs.DirectoryQueue;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Accounts;
import org.whispersystems.textsecuregcm.storage.AccountsDynamoDb;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.AccountsManager.DeletionReason;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
import org.whispersystems.textsecuregcm.storage.KeysDynamoDb;
@@ -100,9 +104,19 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
.withRequestTimeout((int) configuration.getKeysDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
.withCredentials(InstanceProfileCredentialsProvider.getInstance());
AmazonDynamoDBClientBuilder accountsDynamoDbClientBuilder = AmazonDynamoDBClientBuilder
.standard()
.withRegion(configuration.getAccountsDynamoDbConfiguration().getRegion())
.withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) configuration.getAccountsDynamoDbConfiguration().getClientExecutionTimeout().toMillis()))
.withRequestTimeout((int) configuration.getAccountsDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
.withCredentials(InstanceProfileCredentialsProvider.getInstance());
DynamoDB messageDynamoDb = new DynamoDB(clientBuilder.build());
DynamoDB preKeysDynamoDb = new DynamoDB(keysDynamoDbClientBuilder.build());
AmazonDynamoDB accountsDynamoDbClient = accountsDynamoDbClientBuilder.build();
FaultTolerantRedisCluster cacheCluster = new FaultTolerantRedisCluster("main_cache_cluster", configuration.getCacheClusterConfiguration(), redisClusterClientResources);
ExecutorService keyspaceNotificationDispatchExecutor = environment.lifecycle().executorService(name(getClass(), "keyspaceNotification-%d")).maxThreads(4).build();
@@ -113,8 +127,12 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
ExternalServiceCredentialGenerator storageCredentialsGenerator = new ExternalServiceCredentialGenerator(configuration.getSecureStorageServiceConfiguration().getUserAuthenticationTokenSharedSecret(), new byte[0], false);
DynamicConfigurationManager dynamicConfigurationManager = new DynamicConfigurationManager(configuration.getAppConfig().getApplication(), configuration.getAppConfig().getEnvironment(), configuration.getAppConfig().getConfigurationName());
dynamicConfigurationManager.start();
ExperimentEnrollmentManager experimentEnrollmentManager = new ExperimentEnrollmentManager(dynamicConfigurationManager);
Accounts accounts = new Accounts(accountDatabase);
AccountsDynamoDb accountsDynamoDb = new AccountsDynamoDb(accountsDynamoDbClient, new DynamoDB(accountsDynamoDbClient), configuration.getAccountsDynamoDbConfiguration().getTableName(), configuration.getAccountsDynamoDbConfiguration().getPhoneNumberTableName());
Usernames usernames = new Usernames(accountDatabase);
Profiles profiles = new Profiles(accountDatabase);
ReservedUsernames reservedUsernames = new ReservedUsernames(accountDatabase);
@@ -131,13 +149,13 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
UsernamesManager usernamesManager = new UsernamesManager(usernames, reservedUsernames, cacheCluster);
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, pushLatencyManager);
AccountsManager accountsManager = new AccountsManager(accounts, cacheCluster, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager, secureStorageClient, secureBackupClient);
AccountsManager accountsManager = new AccountsManager(accounts, accountsDynamoDb, cacheCluster, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager, secureStorageClient, secureBackupClient, experimentEnrollmentManager, dynamicConfigurationManager);
for (String user: users) {
Optional<Account> account = accountsManager.get(user);
if (account.isPresent()) {
accountsManager.delete(account.get(), AccountsManager.DeletionReason.ADMIN_DELETED);
accountsManager.delete(account.get(), DeletionReason.ADMIN_DELETED);
logger.warn("Removed " + account.get().getNumber());
} else {
logger.warn("Account not found");