Build Dynamo DB backed Message Store (#358)

* Work in progress...

* Finish first pass draft of MessagesDynamoDb

* Use begins_with everywhere for destination device id

* Remove now unused methods

* First basic test built

* Add another test case

* Remove comment

* Verify more of the message contents

* Ensure all methods are tested

* Integrate MessagesDynamoDb into the MessagesManager

This change plugs the MessagesDynamoDb class into the live serving
flow in MessagesManager.

Tests are not yet as comprehensive for this big a change as they
should be, but they now compile and pass so checkpointing here with a
commit.

* Put DynamoDB before RDBS when deleting specific messages

* Extract method

* Make aws sdk version into a property

* Rename clientBuilder

* Discard messages with no GUID

* Unify batching logic into one function

* Comment on the source of the value in this constant

* Inline method

* Variable name swizzle

* Add timers to all public methods

* Add missing return statements

* Reject messages that are too large with response code 413

* Add configuration to control dynamo DB timeouts

* Set server timestamp from the ReceiptSender

* Change to shorter key names to optimize IOPS

* Fix tests broken by changing column names

* Fix broken copyright template output

* Remove copyright template error text

* Add experiments to control use of dynamo and rds in message storage

* Specify instance profile credentials for the dynamic configuration manager

* Use property for aws sdk version

* Switch dynamo to instance profile credentials

* Add metrics to the batch write loop

* Use placeholders in logging
This commit is contained in:
Ehren Kret
2021-02-03 10:03:19 -06:00
committed by GitHub
parent d71082b491
commit 0dcb4b645c
23 changed files with 987 additions and 91 deletions

View File

@@ -19,6 +19,7 @@ import org.whispersystems.textsecuregcm.configuration.GcpAttachmentsConfiguratio
import org.whispersystems.textsecuregcm.configuration.AccountsDatabaseConfiguration;
import org.whispersystems.textsecuregcm.configuration.MaxDeviceConfiguration;
import org.whispersystems.textsecuregcm.configuration.MessageCacheConfiguration;
import org.whispersystems.textsecuregcm.configuration.MessageDynamoDbConfiguration;
import org.whispersystems.textsecuregcm.configuration.MicrometerConfiguration;
import org.whispersystems.textsecuregcm.configuration.PaymentsServiceConfiguration;
import org.whispersystems.textsecuregcm.configuration.PushConfiguration;
@@ -122,6 +123,11 @@ public class WhisperServerConfiguration extends Configuration {
@JsonProperty
private RedisClusterConfiguration clientPresenceCluster;
@Valid
@NotNull
@JsonProperty
private MessageDynamoDbConfiguration messageDynamoDb;
@Valid
@NotNull
@JsonProperty
@@ -296,6 +302,10 @@ public class WhisperServerConfiguration extends Configuration {
return pushSchedulerCluster;
}
public MessageDynamoDbConfiguration getMessageDynamoDbConfiguration() {
return messageDynamoDb;
}
public DatabaseConfiguration getMessageStoreConfiguration() {
return messageStore;
}

View File

@@ -4,10 +4,14 @@
*/
package org.whispersystems.textsecuregcm;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.codahale.metrics.SharedMetricRegistries;
@@ -66,6 +70,7 @@ import org.whispersystems.textsecuregcm.controllers.SecureBackupController;
import org.whispersystems.textsecuregcm.controllers.SecureStorageController;
import org.whispersystems.textsecuregcm.controllers.StickerController;
import org.whispersystems.textsecuregcm.controllers.VoiceVerificationController;
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
import org.whispersystems.textsecuregcm.filters.TimestampResponseFilter;
import org.whispersystems.textsecuregcm.limits.RateLimiters;
import org.whispersystems.textsecuregcm.liquibase.NameableMigrationsBundle;
@@ -124,6 +129,7 @@ import org.whispersystems.textsecuregcm.storage.Keys;
import org.whispersystems.textsecuregcm.storage.MessagePersister;
import org.whispersystems.textsecuregcm.storage.Messages;
import org.whispersystems.textsecuregcm.storage.MessagesCache;
import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.PendingAccounts;
import org.whispersystems.textsecuregcm.storage.PendingAccountsManager;
@@ -263,6 +269,14 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
FaultTolerantDatabase messageDatabase = new FaultTolerantDatabase("message_database", messageJdbi, config.getMessageStoreConfiguration().getCircuitBreakerConfiguration());
FaultTolerantDatabase abuseDatabase = new FaultTolerantDatabase("abuse_database", abuseJdbi, config.getAbuseDatabaseConfiguration().getCircuitBreakerConfiguration());
AmazonDynamoDBClientBuilder messageDynamoDbClientBuilder = AmazonDynamoDBClientBuilder
.standard()
.withRegion(config.getMessageDynamoDbConfiguration().getRegion())
.withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) config.getMessageDynamoDbConfiguration().getClientExecutionTimeout().toMillis()))
.withRequestTimeout((int) config.getMessageDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
.withCredentials(InstanceProfileCredentialsProvider.getInstance());
DynamoDB messageDynamoDb = new DynamoDB(messageDynamoDbClientBuilder.build());
Accounts accounts = new Accounts(accountDatabase);
PendingAccounts pendingAccounts = new PendingAccounts(accountDatabase);
PendingDevices pendingDevices = new PendingDevices (accountDatabase);
@@ -271,6 +285,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
Profiles profiles = new Profiles(accountDatabase);
Keys keys = new Keys(accountDatabase, config.getAccountsDatabaseConfiguration().getKeyOperationRetryConfiguration());
Messages messages = new Messages(messageDatabase);
MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(messageDynamoDb, config.getMessageDynamoDbConfiguration().getTableName(), config.getMessageDynamoDbConfiguration().getTimeToLive());
AbusiveHostRules abusiveHostRules = new AbusiveHostRules(abuseDatabase);
RemoteConfigs remoteConfigs = new RemoteConfigs(accountDatabase);
FeatureFlags featureFlags = new FeatureFlags(accountDatabase);
@@ -308,6 +323,9 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
ExecutorService apnSenderExecutor = environment.lifecycle().executorService(name(getClass(), "apnSender-%d")).maxThreads(1).minThreads(1).build();
ExecutorService gcmSenderExecutor = environment.lifecycle().executorService(name(getClass(), "gcmSender-%d")).maxThreads(1).minThreads(1).build();
DynamicConfigurationManager dynamicConfigurationManager = new DynamicConfigurationManager(config.getAppConfig().getApplication(), config.getAppConfig().getEnvironment(), config.getAppConfig().getConfigurationName());
ExperimentEnrollmentManager experimentEnrollmentManager = new ExperimentEnrollmentManager(dynamicConfigurationManager);
ClientPresenceManager clientPresenceManager = new ClientPresenceManager(clientPresenceCluster, recurringJobExecutor, keyspaceNotificationDispatchExecutor);
DirectoryManager directory = new DirectoryManager(directoryClient);
DirectoryQueue directoryQueue = new DirectoryQueue(config.getDirectoryConfiguration().getSqsConfiguration());
@@ -317,7 +335,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
MessagesCache messagesCache = new MessagesCache(messagesCluster, messagesCluster, keyspaceNotificationDispatchExecutor);
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster);
MessagesManager messagesManager = new MessagesManager(messages, messagesCache, pushLatencyManager);
MessagesManager messagesManager = new MessagesManager(messages, messagesDynamoDb, messagesCache, pushLatencyManager, experimentEnrollmentManager);
AccountsManager accountsManager = new AccountsManager(accounts, directory, cacheCluster, directoryQueue, keys, messagesManager, usernamesManager, profilesManager);
RemoteConfigsManager remoteConfigsManager = new RemoteConfigsManager(remoteConfigs);
FeatureFlagsManager featureFlagsManager = new FeatureFlagsManager(featureFlags, recurringJobExecutor);
@@ -329,8 +347,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
RateLimiters rateLimiters = new RateLimiters(config.getLimitsConfiguration(), cacheCluster);
ProvisioningManager provisioningManager = new ProvisioningManager(pubSubManager);
DynamicConfigurationManager dynamicConfigurationManager = new DynamicConfigurationManager(config.getAppConfig().getApplication(), config.getAppConfig().getEnvironment(), config.getAppConfig().getConfigurationName());
AccountAuthenticator accountAuthenticator = new AccountAuthenticator(accountsManager);
DisabledPermittedAccountAuthenticator disabledPermittedAccountAuthenticator = new DisabledPermittedAccountAuthenticator(accountsManager);

View File

@@ -0,0 +1,43 @@
/*
* Copyright 2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.configuration;
import javax.validation.Valid;
import javax.validation.constraints.NotEmpty;
import java.time.Duration;
public class MessageDynamoDbConfiguration {
private String region;
private String tableName;
private Duration timeToLive = Duration.ofDays(7);
private Duration clientExecutionTimeout = Duration.ofSeconds(30);
private Duration clientRequestTimeout = Duration.ofSeconds(10);
@Valid
@NotEmpty
public String getRegion() {
return region;
}
@Valid
@NotEmpty
public String getTableName() {
return tableName;
}
@Valid
public Duration getTimeToLive() {
return timeToLive;
}
public Duration getClientExecutionTimeout() {
return clientExecutionTimeout;
}
public Duration getClientRequestTimeout() {
return clientRequestTimeout;
}
}

View File

@@ -76,7 +76,6 @@ public class MessageController {
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private final Meter unidentifiedMeter = metricRegistry.meter(name(getClass(), "delivery", "unidentified"));
private final Meter identifiedMeter = metricRegistry.meter(name(getClass(), "delivery", "identified" ));
private final Meter rejectOversizeMessageMeter = metricRegistry.meter(name(getClass(), "rejectOversizeMessage"));
private final Meter rejectOver256kibMessageMeter = metricRegistry.meter(name(getClass(), "rejectOver256kibMessage"));
private final Timer sendMessageInternalTimer = metricRegistry.timer(name(getClass(), "sendMessageInternal"));
private final Histogram outgoingMessageListSizeHistogram = metricRegistry.histogram(name(getClass(), "outgoingMessageListSize"));
@@ -93,8 +92,7 @@ public class MessageController {
private static final String CONTENT_SIZE_DISTRIBUTION_NAME = name(MessageController.class, "messageContentSize");
private static final String OUTGOING_MESSAGE_LIST_SIZE_BYTES_DISTRIBUTION_NAME = name(MessageController.class, "outgoingMessageListSizeBytes");
private static final long MAX_MESSAGE_SIZE = DataSize.mebibytes(1).toBytes();
private static final long SMALLER_MAX_MESSAGE_SIZE = DataSize.kibibytes(256).toBytes();
private static final long MAX_MESSAGE_SIZE = DataSize.kibibytes(256).toBytes();
public MessageController(RateLimiters rateLimiters,
MessageSender messageSender,
@@ -154,12 +152,8 @@ public class MessageController {
Metrics.summary(CONTENT_SIZE_DISTRIBUTION_NAME, UserAgentTagUtil.getUserAgentTags(userAgent)).record(contentLength);
if (contentLength > MAX_MESSAGE_SIZE) {
// TODO Reject the request
rejectOversizeMessageMeter.mark();
}
if (contentLength > SMALLER_MAX_MESSAGE_SIZE) {
rejectOver256kibMessageMeter.mark();
return Response.status(Response.Status.REQUEST_ENTITY_TOO_LARGE).build();
}
}

View File

@@ -6,7 +6,6 @@
package org.whispersystems.textsecuregcm.experiment;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicExperimentEnrollmentConfiguration;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import java.util.Collections;
@@ -22,7 +21,7 @@ public class ExperimentEnrollmentManager {
this.dynamicConfigurationManager = dynamicConfigurationManager;
}
public boolean isEnrolled(final Account account, final String experimentName) {
public boolean isEnrolled(final UUID accountUuid, final String experimentName) {
final Optional<DynamicExperimentEnrollmentConfiguration> maybeConfiguration = dynamicConfigurationManager.getConfiguration().getExperimentEnrollmentConfiguration(experimentName);
final Set<UUID> enrolledUuids = maybeConfiguration.map(DynamicExperimentEnrollmentConfiguration::getEnrolledUuids)
@@ -30,11 +29,11 @@ public class ExperimentEnrollmentManager {
final boolean enrolled;
if (enrolledUuids.contains(account.getUuid())) {
if (enrolledUuids.contains(accountUuid)) {
enrolled = true;
} else {
final int threshold = maybeConfiguration.map(DynamicExperimentEnrollmentConfiguration::getEnrollmentPercentage).orElse(0);
final int enrollmentHash = ((account.getUuid().hashCode() ^ experimentName.hashCode()) & Integer.MAX_VALUE) % 100;
final int enrollmentHash = ((accountUuid.hashCode() ^ experimentName.hashCode()) & Integer.MAX_VALUE) % 100;
enrolled = enrollmentHash < threshold;
}

View File

@@ -38,6 +38,7 @@ public class ReceiptSender {
Account destinationAccount = getDestinationAccount(destination);
Envelope.Builder message = Envelope.newBuilder()
.setServerTimestamp(System.currentTimeMillis())
.setSource(source.getNumber())
.setSourceUuid(source.getUuid().toString())
.setSourceDevice((int) source.getAuthenticatedDevice().get().getId())

View File

@@ -1,6 +1,7 @@
package org.whispersystems.textsecuregcm.storage;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.services.appconfig.AmazonAppConfig;
import com.amazonaws.services.appconfig.AmazonAppConfigClient;
import com.amazonaws.services.appconfig.model.GetConfigurationRequest;
@@ -44,6 +45,7 @@ public class DynamicConfigurationManager implements Managed {
public DynamicConfigurationManager(String application, String environment, String configurationName) {
this(AmazonAppConfigClient.builder()
.withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(10000).withRequestTimeout(10000))
.withCredentials(InstanceProfileCredentialsProvider.getInstance())
.build(),
application, environment, configurationName, UUID.randomUUID().toString());
}

View File

@@ -46,7 +46,6 @@ public class Messages {
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private final Timer storeTimer = metricRegistry.timer(name(Messages.class, "store" ));
private final Timer loadTimer = metricRegistry.timer(name(Messages.class, "load" ));
private final Timer hasMessagesTimer = metricRegistry.timer(name(Messages.class, "hasMessages" ));
private final Timer removeBySourceTimer = metricRegistry.timer(name(Messages.class, "removeBySource"));
private final Timer removeByGuidTimer = metricRegistry.timer(name(Messages.class, "removeByGuid" ));
private final Timer removeByIdTimer = metricRegistry.timer(name(Messages.class, "removeById" ));

View File

@@ -0,0 +1,377 @@
/*
* Copyright 2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
import com.amazonaws.services.dynamodbv2.document.DeleteItemOutcome;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Index;
import com.amazonaws.services.dynamodbv2.document.Item;
import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
import com.amazonaws.services.dynamodbv2.document.api.QueryApi;
import com.amazonaws.services.dynamodbv2.document.spec.DeleteItemSpec;
import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec;
import com.amazonaws.services.dynamodbv2.model.ReturnValue;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
import javax.annotation.Nonnull;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import static com.codahale.metrics.MetricRegistry.name;
import static io.micrometer.core.instrument.Metrics.counter;
import static io.micrometer.core.instrument.Metrics.timer;
public class MessagesDynamoDb {
private static final int MAX_ATTEMPTS_TO_SAVE_BATCH_WRITE = 25; // This was arbitrarily chosen and may be entirely too high.
private static final int DYNAMO_DB_MAX_BATCH_SIZE = 25; // This limit comes from Amazon Dynamo DB itself. It will reject batch writes larger than this.
public static final int RESULT_SET_CHUNK_SIZE = 100;
private static final String KEY_PARTITION = "H";
private static final String KEY_SORT = "S";
private static final String LOCAL_INDEX_MESSAGE_UUID_NAME = "Message_UUID_Index";
private static final String LOCAL_INDEX_MESSAGE_UUID_KEY_SORT = "U";
private static final String KEY_TYPE = "T";
private static final String KEY_RELAY = "R";
private static final String KEY_TIMESTAMP = "TS";
private static final String KEY_SOURCE = "SN";
private static final String KEY_SOURCE_UUID = "SU";
private static final String KEY_SOURCE_DEVICE = "SD";
private static final String KEY_MESSAGE = "M";
private static final String KEY_CONTENT = "C";
private static final String KEY_TTL = "E";
private final Logger logger = LoggerFactory.getLogger(getClass());
private final Timer batchWriteItemsFirstPass = timer(name(getClass(), "batchWriteItems"), "firstAttempt", "true");
private final Timer batchWriteItemsRetryPass = timer(name(getClass(), "batchWriteItems"), "firstAttempt", "false");
private final Counter batchWriteItemsUnprocessed = counter(name(getClass(), "batchWriteItemsUnprocessed"));
private final Timer storeTimer = timer(name(getClass(), "store"));
private final Timer loadTimer = timer(name(getClass(), "load"));
private final Timer deleteBySourceAndTimestamp = timer(name(getClass(), "delete", "sourceAndTimestamp"));
private final Timer deleteByGuid = timer(name(getClass(), "delete", "guid"));
private final Timer deleteByAccount = timer(name(getClass(), "delete", "account"));
private final Timer deleteByDevice = timer(name(getClass(), "delete", "device"));
private final DynamoDB dynamoDb;
private final String tableName;
private final Duration timeToLive;
public MessagesDynamoDb(DynamoDB dynamoDb, String tableName, Duration timeToLive) {
this.dynamoDb = dynamoDb;
this.tableName = tableName;
this.timeToLive = timeToLive;
}
public void store(final List<MessageProtos.Envelope> messages, final UUID destinationAccountUuid, final long destinationDeviceId) {
storeTimer.record(() -> doInBatches(messages, (messageBatch) -> storeBatch(messageBatch, destinationAccountUuid, destinationDeviceId), DYNAMO_DB_MAX_BATCH_SIZE));
}
private void storeBatch(final List<MessageProtos.Envelope> messages, final UUID destinationAccountUuid, final long destinationDeviceId) {
if (messages.size() > DYNAMO_DB_MAX_BATCH_SIZE) {
throw new IllegalArgumentException("Maximum batch size of " + DYNAMO_DB_MAX_BATCH_SIZE + " execeeded with " + messages.size() + " messages");
}
final byte[] partitionKey = convertPartitionKey(destinationAccountUuid);
TableWriteItems items = new TableWriteItems(tableName);
for (MessageProtos.Envelope message : messages) {
final UUID messageUuid = UUID.fromString(message.getServerGuid());
final Item item = new Item().withBinary(KEY_PARTITION, partitionKey)
.withBinary(KEY_SORT, convertSortKey(destinationDeviceId, message.getServerTimestamp(), messageUuid))
.withBinary(LOCAL_INDEX_MESSAGE_UUID_KEY_SORT, convertLocalIndexMessageUuidSortKey(messageUuid))
.withInt(KEY_TYPE, message.getType().getNumber())
.withLong(KEY_TIMESTAMP, message.getTimestamp())
.withLong(KEY_TTL, getTtlForMessage(message));
if (message.hasRelay() && message.getRelay().length() > 0) {
item.withString(KEY_RELAY, message.getRelay());
}
if (message.hasSource()) {
item.withString(KEY_SOURCE, message.getSource());
}
if (message.hasSourceUuid()) {
item.withBinary(KEY_SOURCE_UUID, convertUuidToBytes(UUID.fromString(message.getSourceUuid())));
}
if (message.hasSourceDevice()) {
item.withInt(KEY_SOURCE_DEVICE, message.getSourceDevice());
}
if (message.hasLegacyMessage()) {
item.withBinary(KEY_MESSAGE, message.getLegacyMessage().toByteArray());
}
if (message.hasContent()) {
item.withBinary(KEY_CONTENT, message.getContent().toByteArray());
}
items.addItemToPut(item);
}
executeTableWriteItemsUntilComplete(items);
}
public List<OutgoingMessageEntity> load(final UUID destinationAccountUuid, final long destinationDeviceId, final int requestedNumberOfMessagesToFetch) {
return loadTimer.record(() -> {
final int numberOfMessagesToFetch = Math.min(requestedNumberOfMessagesToFetch, RESULT_SET_CHUNK_SIZE);
final byte[] partitionKey = convertPartitionKey(destinationAccountUuid);
final QuerySpec querySpec = new QuerySpec().withConsistentRead(true)
.withKeyConditionExpression("#part = :part AND begins_with ( #sort , :sortprefix )")
.withNameMap(Map.of("#part", KEY_PARTITION,
"#sort", KEY_SORT))
.withValueMap(Map.of(":part", partitionKey,
":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(destinationDeviceId)))
.withMaxResultSize(numberOfMessagesToFetch);
final Table table = dynamoDb.getTable(tableName);
List<OutgoingMessageEntity> messageEntities = new ArrayList<>(numberOfMessagesToFetch);
for (Item message : table.query(querySpec)) {
messageEntities.add(convertItemToOutgoingMessageEntity(message));
}
return messageEntities;
});
}
public Optional<OutgoingMessageEntity> deleteMessageByDestinationAndSourceAndTimestamp(final UUID destinationAccountUuid, final long destinationDeviceId, final String source, final long timestamp) {
return deleteBySourceAndTimestamp.record(() -> {
if (StringUtils.isEmpty(source)) {
throw new IllegalArgumentException("must specify a source");
}
final byte[] partitionKey = convertPartitionKey(destinationAccountUuid);
final QuerySpec querySpec = new QuerySpec().withProjectionExpression(KEY_SORT)
.withConsistentRead(true)
.withKeyConditionExpression("#part = :part AND begins_with ( #sort , :sortprefix )")
.withFilterExpression("#source = :source AND #timestamp = :timestamp")
.withNameMap(Map.of("#part", KEY_PARTITION,
"#sort", KEY_SORT,
"#source", KEY_SOURCE,
"#timestamp", KEY_TIMESTAMP))
.withValueMap(Map.of(":part", partitionKey,
":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(destinationDeviceId),
":source", source,
":timestamp", timestamp));
final Table table = dynamoDb.getTable(tableName);
return deleteItemsMatchingQueryAndReturnFirstOneActuallyDeleted(table, partitionKey, querySpec, table);
});
}
public Optional<OutgoingMessageEntity> deleteMessageByDestinationAndGuid(final UUID destinationAccountUuid, final long destinationDeviceId, final UUID messageUuid) {
return deleteByGuid.record(() -> {
final byte[] partitionKey = convertPartitionKey(destinationAccountUuid);
final QuerySpec querySpec = new QuerySpec().withProjectionExpression(KEY_SORT)
.withConsistentRead(true)
.withKeyConditionExpression("#part = :part AND #uuid = :uuid")
.withNameMap(Map.of("#part", KEY_PARTITION,
"#uuid", LOCAL_INDEX_MESSAGE_UUID_KEY_SORT))
.withValueMap(Map.of(":part", partitionKey,
":uuid", convertLocalIndexMessageUuidSortKey(messageUuid)));
final Table table = dynamoDb.getTable(tableName);
final Index index = table.getIndex(LOCAL_INDEX_MESSAGE_UUID_NAME);
return deleteItemsMatchingQueryAndReturnFirstOneActuallyDeleted(table, partitionKey, querySpec, index);
});
}
@Nonnull
private Optional<OutgoingMessageEntity> deleteItemsMatchingQueryAndReturnFirstOneActuallyDeleted(Table table, byte[] partitionKey, QuerySpec querySpec, QueryApi queryApi) {
Optional<OutgoingMessageEntity> result = Optional.empty();
for (Item item : queryApi.query(querySpec)) {
final byte[] rangeKeyValue = item.getBinary(KEY_SORT);
DeleteItemSpec deleteItemSpec = new DeleteItemSpec().withPrimaryKey(KEY_PARTITION, partitionKey, KEY_SORT, rangeKeyValue);
if (result.isEmpty()) {
deleteItemSpec.withReturnValues(ReturnValue.ALL_OLD);
}
final DeleteItemOutcome deleteItemOutcome = table.deleteItem(deleteItemSpec);
if (deleteItemOutcome.getItem().hasAttribute(KEY_PARTITION)) {
result = Optional.of(convertItemToOutgoingMessageEntity(deleteItemOutcome.getItem()));
}
}
return result;
}
public void deleteAllMessagesForAccount(final UUID destinationAccountUuid) {
deleteByAccount.record(() -> {
final byte[] partitionKey = convertPartitionKey(destinationAccountUuid);
final QuerySpec querySpec = new QuerySpec().withHashKey(KEY_PARTITION, partitionKey)
.withProjectionExpression(KEY_SORT)
.withConsistentRead(true);
deleteRowsMatchingQuery(partitionKey, querySpec);
});
}
public void deleteAllMessagesForDevice(final UUID destinationAccountUuid, final long destinationDeviceId) {
deleteByDevice.record(() -> {
final byte[] partitionKey = convertPartitionKey(destinationAccountUuid);
final QuerySpec querySpec = new QuerySpec().withKeyConditionExpression("#part = :part AND begins_with ( #sort , :sortprefix )")
.withNameMap(Map.of("#part", KEY_PARTITION,
"#sort", KEY_SORT))
.withValueMap(Map.of(":part", partitionKey,
":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(destinationDeviceId)))
.withProjectionExpression(KEY_SORT)
.withConsistentRead(true);
deleteRowsMatchingQuery(partitionKey, querySpec);
});
}
private OutgoingMessageEntity convertItemToOutgoingMessageEntity(Item message) {
final SortKey sortKey = convertSortKey(message.getBinary(KEY_SORT));
final UUID messageUuid = convertLocalIndexMessageUuidSortKey(message.getBinary(LOCAL_INDEX_MESSAGE_UUID_KEY_SORT));
final int type = message.getInt(KEY_TYPE);
final String relay = message.getString(KEY_RELAY);
final long timestamp = message.getLong(KEY_TIMESTAMP);
final String source = message.getString(KEY_SOURCE);
final UUID sourceUuid = message.hasAttribute(KEY_SOURCE_UUID) ? convertUuidFromBytes(message.getBinary(KEY_SOURCE_UUID), "message source uuid") : null;
final int sourceDevice = message.hasAttribute(KEY_SOURCE_DEVICE) ? message.getInt(KEY_SOURCE_DEVICE) : 0;
final byte[] messageBytes = message.getBinary(KEY_MESSAGE);
final byte[] content = message.getBinary(KEY_CONTENT);
return new OutgoingMessageEntity(-1L, false, messageUuid, type, relay, timestamp, source, sourceUuid, sourceDevice, messageBytes, content, sortKey.getServerTimestamp());
}
private void deleteRowsMatchingQuery(byte[] partitionKey, QuerySpec querySpec) {
final Table table = dynamoDb.getTable(tableName);
doInBatches(table.query(querySpec), (itemBatch) -> deleteItems(partitionKey, itemBatch), DYNAMO_DB_MAX_BATCH_SIZE);
}
private void deleteItems(byte[] partitionKey, List<Item> items) {
final TableWriteItems tableWriteItems = new TableWriteItems(tableName);
items.stream().map((x) -> new PrimaryKey(KEY_PARTITION, partitionKey, KEY_SORT, x.getBinary(KEY_SORT))).forEach(tableWriteItems::addPrimaryKeyToDelete);
executeTableWriteItemsUntilComplete(tableWriteItems);
}
private void executeTableWriteItemsUntilComplete(TableWriteItems items) {
AtomicReference<BatchWriteItemOutcome> outcome = new AtomicReference<>();
batchWriteItemsFirstPass.record(() -> {
outcome.set(dynamoDb.batchWriteItem(items));
});
int attemptCount = 0;
while (!outcome.get().getUnprocessedItems().isEmpty() && attemptCount < MAX_ATTEMPTS_TO_SAVE_BATCH_WRITE) {
batchWriteItemsRetryPass.record(() -> {
outcome.set(dynamoDb.batchWriteItemUnprocessed(outcome.get().getUnprocessedItems()));
});
++attemptCount;
}
if (!outcome.get().getUnprocessedItems().isEmpty()) {
logger.error("Attempt count ({}) reached max ({}}) before applying all batch writes to dynamo. {} unprocessed items remain.", attemptCount, MAX_ATTEMPTS_TO_SAVE_BATCH_WRITE, outcome.get().getUnprocessedItems().size());
batchWriteItemsUnprocessed.increment(outcome.get().getUnprocessedItems().size());
}
}
private long getTtlForMessage(MessageProtos.Envelope message) {
return message.getServerTimestamp() / 1000 + timeToLive.getSeconds();
}
private static <T> void doInBatches(final Iterable<T> items, final Consumer<List<T>> action, final int batchSize) {
List<T> batch = new ArrayList<>(batchSize);
for (T item : items) {
batch.add(item);
if (batch.size() == batchSize) {
action.accept(batch);
batch.clear();
}
}
if (!batch.isEmpty()) {
action.accept(batch);
}
}
private static byte[] convertPartitionKey(final UUID destinationAccountUuid) {
return convertUuidToBytes(destinationAccountUuid);
}
private static UUID convertPartitionKey(final byte[] bytes) {
return convertUuidFromBytes(bytes, "partition key");
}
private static byte[] convertSortKey(final long destinationDeviceId, final long serverTimestamp, final UUID messageUuid) {
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[32]);
byteBuffer.putLong(destinationDeviceId);
byteBuffer.putLong(serverTimestamp);
byteBuffer.putLong(messageUuid.getMostSignificantBits());
byteBuffer.putLong(messageUuid.getLeastSignificantBits());
return byteBuffer.array();
}
private static byte[] convertDestinationDeviceIdToSortKeyPrefix(final long destinationDeviceId) {
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[8]);
byteBuffer.putLong(destinationDeviceId);
return byteBuffer.array();
}
private static SortKey convertSortKey(final byte[] bytes) {
if (bytes.length != 32) {
throw new IllegalArgumentException("unexpected sort key byte length");
}
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
final long destinationDeviceId = byteBuffer.getLong();
final long serverTimestamp = byteBuffer.getLong();
final long mostSigBits = byteBuffer.getLong();
final long leastSigBits = byteBuffer.getLong();
return new SortKey(destinationDeviceId, serverTimestamp, new UUID(mostSigBits, leastSigBits));
}
private static byte[] convertLocalIndexMessageUuidSortKey(final UUID messageUuid) {
return convertUuidToBytes(messageUuid);
}
private static UUID convertLocalIndexMessageUuidSortKey(final byte[] bytes) {
return convertUuidFromBytes(bytes, "local index message uuid sort key");
}
private static byte[] convertUuidToBytes(final UUID uuid) {
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]);
byteBuffer.putLong(uuid.getMostSignificantBits());
byteBuffer.putLong(uuid.getLeastSignificantBits());
return byteBuffer.array();
}
private static UUID convertUuidFromBytes(final byte[] bytes, final String name) {
if (bytes.length != 16) {
throw new IllegalArgumentException("unexpected " + name + " byte length; was " + bytes.length + " but expected 16");
}
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
final long mostSigBits = byteBuffer.getLong();
final long leastSigBits = byteBuffer.getLong();
return new UUID(mostSigBits, leastSigBits);
}
private static final class SortKey {
private final long destinationDeviceId;
private final long serverTimestamp;
private final UUID messageUuid;
public SortKey(long destinationDeviceId, long serverTimestamp, UUID messageUuid) {
this.destinationDeviceId = destinationDeviceId;
this.serverTimestamp = serverTimestamp;
this.messageUuid = messageUuid;
}
public long getDestinationDeviceId() {
return destinationDeviceId;
}
public long getServerTimestamp() {
return serverTimestamp;
}
public UUID getMessageUuid() {
return messageUuid;
}
}
}

View File

@@ -12,6 +12,7 @@ import com.codahale.metrics.SharedMetricRegistries;
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntityList;
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
import org.whispersystems.textsecuregcm.metrics.PushLatencyManager;
import org.whispersystems.textsecuregcm.redis.RedisOperation;
import org.whispersystems.textsecuregcm.util.Constants;
@@ -26,22 +27,28 @@ import static com.codahale.metrics.MetricRegistry.name;
public class MessagesManager {
private static final String READ_DYNAMODB_EXPERIMENT = "messages_dynamodb_read";
private static final String WRITE_DYNAMODB_EXPERIMENT = "messages_dynamodb_write";
private static final String DISABLE_RDS_EXPERIMENT = "messages_disable_rds";
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private static final Meter cacheHitByIdMeter = metricRegistry.meter(name(MessagesManager.class, "cacheHitById" ));
private static final Meter cacheMissByIdMeter = metricRegistry.meter(name(MessagesManager.class, "cacheMissById" ));
private static final Meter cacheHitByNameMeter = metricRegistry.meter(name(MessagesManager.class, "cacheHitByName" ));
private static final Meter cacheMissByNameMeter = metricRegistry.meter(name(MessagesManager.class, "cacheMissByName"));
private static final Meter cacheHitByGuidMeter = metricRegistry.meter(name(MessagesManager.class, "cacheHitByGuid" ));
private static final Meter cacheMissByGuidMeter = metricRegistry.meter(name(MessagesManager.class, "cacheMissByGuid"));
private final Messages messages;
private final MessagesCache messagesCache;
private final Messages messages;
private final MessagesDynamoDb messagesDynamoDb;
private final MessagesCache messagesCache;
private final PushLatencyManager pushLatencyManager;
private final ExperimentEnrollmentManager experimentEnrollmentManager;
public MessagesManager(Messages messages, MessagesCache messagesCache, PushLatencyManager pushLatencyManager) {
this.messages = messages;
this.messagesCache = messagesCache;
public MessagesManager(Messages messages, MessagesDynamoDb messagesDynamoDb, MessagesCache messagesCache, PushLatencyManager pushLatencyManager, ExperimentEnrollmentManager experimentEnrollmentManager) {
this.messages = messages;
this.messagesDynamoDb = messagesDynamoDb;
this.messagesCache = messagesCache;
this.pushLatencyManager = pushLatencyManager;
this.experimentEnrollmentManager = experimentEnrollmentManager;
}
public void insert(UUID destinationUuid, long destinationDevice, Envelope message) {
@@ -63,39 +70,58 @@ public class MessagesManager {
public OutgoingMessageEntityList getMessagesForDevice(String destination, UUID destinationUuid, long destinationDevice, final String userAgent, final boolean cachedMessagesOnly) {
RedisOperation.unchecked(() -> pushLatencyManager.recordQueueRead(destinationUuid, destinationDevice, userAgent));
List<OutgoingMessageEntity> messages = cachedMessagesOnly ? new ArrayList<>() : this.messages.load(destination, destinationDevice);
List<OutgoingMessageEntity> messageList = new ArrayList<>();
if (messages.size() < Messages.RESULT_SET_CHUNK_SIZE) {
messages.addAll(messagesCache.get(destinationUuid, destinationDevice, Messages.RESULT_SET_CHUNK_SIZE - messages.size()));
if (!cachedMessagesOnly && !experimentEnrollmentManager.isEnrolled(destinationUuid, DISABLE_RDS_EXPERIMENT)) {
messageList.addAll(messages.load(destination, destinationDevice));
}
return new OutgoingMessageEntityList(messages, messages.size() >= Messages.RESULT_SET_CHUNK_SIZE);
if (messageList.size() < Messages.RESULT_SET_CHUNK_SIZE && !cachedMessagesOnly && experimentEnrollmentManager.isEnrolled(destinationUuid, READ_DYNAMODB_EXPERIMENT)) {
messageList.addAll(messagesDynamoDb.load(destinationUuid, destinationDevice, Messages.RESULT_SET_CHUNK_SIZE - messageList.size()));
}
if (messageList.size() < Messages.RESULT_SET_CHUNK_SIZE) {
messageList.addAll(messagesCache.get(destinationUuid, destinationDevice, Messages.RESULT_SET_CHUNK_SIZE - messageList.size()));
}
return new OutgoingMessageEntityList(messageList, messageList.size() >= Messages.RESULT_SET_CHUNK_SIZE);
}
public void clear(String destination, UUID destinationUuid) {
// TODO Remove this null check in a fully-UUID-ified world
if (destinationUuid != null) {
this.messagesCache.clear(destinationUuid);
messagesCache.clear(destinationUuid);
if (experimentEnrollmentManager.isEnrolled(destinationUuid, WRITE_DYNAMODB_EXPERIMENT)) {
messagesDynamoDb.deleteAllMessagesForAccount(destinationUuid);
}
if (!experimentEnrollmentManager.isEnrolled(destinationUuid, DISABLE_RDS_EXPERIMENT)) {
messages.clear(destination);
}
} else {
messages.clear(destination);
}
this.messages.clear(destination);
}
public void clear(String destination, UUID destinationUuid, long deviceId) {
// TODO Remove this null check in a fully-UUID-ified world
if (destinationUuid != null) {
this.messagesCache.clear(destinationUuid, deviceId);
messagesCache.clear(destinationUuid, deviceId);
if (experimentEnrollmentManager.isEnrolled(destinationUuid, WRITE_DYNAMODB_EXPERIMENT)) {
messagesDynamoDb.deleteAllMessagesForDevice(destinationUuid, deviceId);
}
if (!experimentEnrollmentManager.isEnrolled(destinationUuid, DISABLE_RDS_EXPERIMENT)) {
messages.clear(destination, deviceId);
}
this.messages.clear(destination, deviceId);
}
public Optional<OutgoingMessageEntity> delete(String destination, UUID destinationUuid, long destinationDevice, String source, long timestamp)
{
public Optional<OutgoingMessageEntity> delete(String destination, UUID destinationUuid, long destinationDevice, String source, long timestamp) {
Optional<OutgoingMessageEntity> removed = messagesCache.remove(destinationUuid, destinationDevice, source, timestamp);
if (!removed.isPresent()) {
removed = this.messages.remove(destination, destinationDevice, source, timestamp);
if (removed.isEmpty()) {
if (experimentEnrollmentManager.isEnrolled(destinationUuid, WRITE_DYNAMODB_EXPERIMENT)) {
removed = messagesDynamoDb.deleteMessageByDestinationAndSourceAndTimestamp(destinationUuid, destinationDevice, source, timestamp);
}
if (removed.isEmpty() && !experimentEnrollmentManager.isEnrolled(destinationUuid, DISABLE_RDS_EXPERIMENT)) {
removed = messages.remove(destination, destinationDevice, source, timestamp);
}
cacheMissByNameMeter.mark();
} else {
cacheHitByNameMeter.mark();
@@ -107,8 +133,13 @@ public class MessagesManager {
public Optional<OutgoingMessageEntity> delete(String destination, UUID destinationUuid, long deviceId, UUID guid) {
Optional<OutgoingMessageEntity> removed = messagesCache.remove(destinationUuid, deviceId, guid);
if (!removed.isPresent()) {
removed = this.messages.remove(destination, guid);
if (removed.isEmpty()) {
if (experimentEnrollmentManager.isEnrolled(destinationUuid, WRITE_DYNAMODB_EXPERIMENT)) {
removed = messagesDynamoDb.deleteMessageByDestinationAndGuid(destinationUuid, deviceId, guid);
}
if (removed.isEmpty() && !experimentEnrollmentManager.isEnrolled(destinationUuid, DISABLE_RDS_EXPERIMENT)) {
removed = messages.remove(destination, guid);
}
cacheMissByGuidMeter.mark();
} else {
cacheHitByGuidMeter.mark();
@@ -117,18 +148,17 @@ public class MessagesManager {
return removed;
}
public void delete(String destination, UUID destinationUuid, long deviceId, long id, boolean cached) {
if (cached) {
messagesCache.remove(destinationUuid, deviceId, id);
cacheHitByIdMeter.mark();
} else {
this.messages.remove(destination, id);
cacheMissByIdMeter.mark();
}
@Deprecated
public void delete(String destination, long id) {
messages.remove(destination, id);
}
public void persistMessages(final String destination, final UUID destinationUuid, final long destinationDeviceId, final List<Envelope> messages) {
this.messages.store(messages, destination, destinationDeviceId);
if (experimentEnrollmentManager.isEnrolled(destinationUuid, WRITE_DYNAMODB_EXPERIMENT)) {
messagesDynamoDb.store(messages, destinationUuid, destinationDeviceId);
} else {
this.messages.store(messages, destination, destinationDeviceId);
}
messagesCache.remove(destinationUuid, destinationDeviceId, messages.stream().map(message -> UUID.fromString(message.getServerGuid())).collect(Collectors.toList()));
}

View File

@@ -43,6 +43,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -145,7 +146,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
if (throwable == null) {
if (isSuccessResponse(response)) {
if (storedMessageInfo.isPresent()) {
messagesManager.delete(account.getNumber(), account.getUuid(), device.getId(), storedMessageInfo.get().id, storedMessageInfo.get().cached);
messagesManager.delete(account.getNumber(), account.getUuid(), device.getId(), storedMessageInfo.get().getGuid());
}
if (message.getType() != Envelope.Type.RECEIPT) {
@@ -252,13 +253,17 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
final Envelope envelope = builder.build();
if (envelope.getSerializedSize() > MAX_DESKTOP_MESSAGE_SIZE && isDesktopClient) {
messagesManager.delete(account.getNumber(), account.getUuid(), device.getId(), message.getId(), message.isCached());
if (message.getGuid() == null || (envelope.getSerializedSize() > MAX_DESKTOP_MESSAGE_SIZE && isDesktopClient)) {
if (message.getGuid() == null) {
messagesManager.delete(account.getNumber(), message.getId()); // TODO(ehren): Remove once the message DB is gone.
} else {
messagesManager.delete(account.getNumber(), account.getUuid(), device.getId(), message.getGuid());
}
discardedMessagesMeter.mark();
sendFutures[i] = CompletableFuture.completedFuture(null);
} else {
sendFutures[i] = sendMessage(builder.build(), Optional.of(new StoredMessageInfo(message.getId(), message.isCached())));
sendFutures[i] = sendMessage(builder.build(), Optional.of(new StoredMessageInfo(message.getGuid())));
}
}
@@ -307,12 +312,14 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
}
private static class StoredMessageInfo {
private final long id;
private final boolean cached;
private final UUID guid;
private StoredMessageInfo(long id, boolean cached) {
this.id = id;
this.cached = cached;
public StoredMessageInfo(UUID guid) {
this.guid = guid;
}
public UUID getGuid() {
return guid;
}
}
}

View File

@@ -5,6 +5,10 @@
package org.whispersystems.textsecuregcm.workers;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.fasterxml.jackson.databind.DeserializationFeature;
import io.dropwizard.Application;
import io.dropwizard.cli.EnvironmentCommand;
@@ -17,6 +21,7 @@ import org.jdbi.v3.core.Jdbi;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
import org.whispersystems.textsecuregcm.metrics.PushLatencyManager;
import org.whispersystems.textsecuregcm.providers.RedisClientFactory;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
@@ -26,10 +31,12 @@ import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Accounts;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.DirectoryManager;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
import org.whispersystems.textsecuregcm.storage.Keys;
import org.whispersystems.textsecuregcm.storage.Messages;
import org.whispersystems.textsecuregcm.storage.MessagesCache;
import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.Profiles;
import org.whispersystems.textsecuregcm.storage.ProfilesManager;
@@ -84,17 +91,28 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
FaultTolerantDatabase messageDatabase = new FaultTolerantDatabase("message_database", messageJdbi, configuration.getMessageStoreConfiguration().getCircuitBreakerConfiguration());
ClientResources redisClusterClientResources = ClientResources.builder().build();
AmazonDynamoDBClientBuilder clientBuilder = AmazonDynamoDBClientBuilder
.standard()
.withRegion(configuration.getMessageDynamoDbConfiguration().getRegion())
.withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) configuration.getMessageDynamoDbConfiguration().getClientExecutionTimeout().toMillis()))
.withRequestTimeout((int) configuration.getMessageDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
.withCredentials(InstanceProfileCredentialsProvider.getInstance());
DynamoDB messageDynamoDb = new DynamoDB(clientBuilder.build());
FaultTolerantRedisCluster cacheCluster = new FaultTolerantRedisCluster("main_cache_cluster", configuration.getCacheClusterConfiguration(), redisClusterClientResources);
ExecutorService keyspaceNotificationDispatchExecutor = environment.lifecycle().executorService(name(getClass(), "keyspaceNotification-%d")).maxThreads(4).build();
DynamicConfigurationManager dynamicConfigurationManager = new DynamicConfigurationManager(configuration.getAppConfig().getApplication(), configuration.getAppConfig().getEnvironment(), configuration.getAppConfig().getConfigurationName());
ExperimentEnrollmentManager experimentEnrollmentManager = new ExperimentEnrollmentManager(dynamicConfigurationManager);
Accounts accounts = new Accounts(accountDatabase);
Usernames usernames = new Usernames(accountDatabase);
Profiles profiles = new Profiles(accountDatabase);
ReservedUsernames reservedUsernames = new ReservedUsernames(accountDatabase);
Keys keys = new Keys(accountDatabase, configuration.getAccountsDatabaseConfiguration().getKeyOperationRetryConfiguration());
Messages messages = new Messages(messageDatabase);
MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(messageDynamoDb, configuration.getMessageDynamoDbConfiguration().getTableName(), configuration.getMessageDynamoDbConfiguration().getTimeToLive());
ReplicatedJedisPool redisClient = new RedisClientFactory("directory_cache_delete_command", configuration.getDirectoryConfiguration().getRedisConfiguration().getUrl(), configuration.getDirectoryConfiguration().getRedisConfiguration().getReplicaUrls(), configuration.getDirectoryConfiguration().getRedisConfiguration().getCircuitBreakerConfiguration()).getRedisClientPool();
FaultTolerantRedisCluster messageInsertCacheCluster = new FaultTolerantRedisCluster("message_insert_cluster", configuration.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClusterClientResources);
FaultTolerantRedisCluster messageReadDeleteCluster = new FaultTolerantRedisCluster("message_read_delete_cluster", configuration.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClusterClientResources);
@@ -105,7 +123,7 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
DirectoryManager directory = new DirectoryManager(redisClient );
UsernamesManager usernamesManager = new UsernamesManager(usernames, reservedUsernames, cacheCluster);
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
MessagesManager messagesManager = new MessagesManager(messages, messagesCache, pushLatencyManager);
MessagesManager messagesManager = new MessagesManager(messages, messagesDynamoDb, messagesCache, pushLatencyManager, new ExperimentEnrollmentManager(dynamicConfigurationManager));
AccountsManager accountsManager = new AccountsManager(accounts, directory, cacheCluster, directoryQueue, keys, messagesManager, usernamesManager, profilesManager);
for (String user: users) {