Delete messages lazily on account and device deletion to prevent timeouts when deleting accounts/devices with large queues

This commit is contained in:
Jonathan Klabunde Tomer
2024-06-04 12:16:43 -07:00
committed by GitHub
parent 4ef6266e8f
commit 01743e5c88
15 changed files with 415 additions and 176 deletions

View File

@@ -394,6 +394,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(dynamoDbClient, dynamoDbAsyncClient,
config.getDynamoDbTables().getMessages().getTableName(),
config.getDynamoDbTables().getMessages().getExpiration(),
dynamicConfigurationManager,
messageDeletionAsyncExecutor);
RemoteConfigs remoteConfigs = new RemoteConfigs(dynamoDbClient,
config.getDynamoDbTables().getRemoteConfig().getTableName());

View File

@@ -67,6 +67,10 @@ public class DynamicConfiguration {
@Valid
DynamicMetricsConfiguration metricsConfiguration = new DynamicMetricsConfiguration(false);
@JsonProperty
@Valid
DynamicMessagesConfiguration messagesConfiguration = new DynamicMessagesConfiguration();
public Optional<DynamicExperimentEnrollmentConfiguration> getExperimentEnrollmentConfiguration(
final String experimentName) {
return Optional.ofNullable(experiments.get(experimentName));
@@ -121,4 +125,8 @@ public class DynamicConfiguration {
return metricsConfiguration;
}
public DynamicMessagesConfiguration getMessagesConfiguration() {
return messagesConfiguration;
}
}

View File

@@ -0,0 +1,25 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.configuration.dynamic;
import java.util.List;
import javax.validation.constraints.NotNull;
public record DynamicMessagesConfiguration(@NotNull List<DynamoKeyScheme> dynamoKeySchemes) {
public enum DynamoKeyScheme {
TRADITIONAL,
LAZY_DELETION;
}
public DynamicMessagesConfiguration() {
this(List.of(DynamoKeyScheme.TRADITIONAL));
}
public DynamoKeyScheme writeKeyScheme() {
return dynamoKeySchemes().getLast();
}
}

View File

@@ -724,7 +724,7 @@ public class MessageController {
return messagesManager.getMessagesForDevice(
auth.getAccount().getUuid(),
auth.getAuthenticatedDevice().getId(),
auth.getAuthenticatedDevice(),
false)
.map(messagesAndHasMore -> {
Stream<Envelope> envelopes = messagesAndHasMore.first().stream();
@@ -768,7 +768,7 @@ public class MessageController {
public CompletableFuture<Response> removePendingMessage(@ReadOnly @Auth AuthenticatedAccount auth, @PathParam("uuid") UUID uuid) {
return messagesManager.delete(
auth.getAccount().getUuid(),
auth.getAuthenticatedDevice().getId(),
auth.getAuthenticatedDevice(),
uuid,
null)
.thenAccept(maybeDeletedMessage -> {

View File

@@ -161,8 +161,13 @@ public class MessagePersister implements Managed {
logger.error("No account record found for account {}", accountUuid);
continue;
}
final Optional<Device> maybeDevice = maybeAccount.flatMap(account -> account.getDevice(deviceId));
if (maybeDevice.isEmpty()) {
logger.error("Account {} does not have a device with id {}", accountUuid, deviceId);
continue;
}
try {
persistQueue(maybeAccount.get(), deviceId);
persistQueue(maybeAccount.get(), maybeDevice.get());
} catch (final Exception e) {
persistQueueExceptionMeter.increment();
logger.warn("Failed to persist queue {}::{}; will schedule for retry", accountUuid, deviceId, e);
@@ -180,8 +185,9 @@ public class MessagePersister implements Managed {
}
@VisibleForTesting
void persistQueue(final Account account, final byte deviceId) throws MessagePersistenceException {
void persistQueue(final Account account, final Device device) throws MessagePersistenceException {
final UUID accountUuid = account.getUuid();
final byte deviceId = device.getId();
final Timer.Sample sample = Timer.start();
@@ -196,7 +202,7 @@ public class MessagePersister implements Managed {
do {
messages = messagesCache.getMessagesToPersist(accountUuid, deviceId, MESSAGE_BATCH_LIMIT);
int messagesRemovedFromCache = messagesManager.persistMessages(accountUuid, deviceId, messages);
int messagesRemovedFromCache = messagesManager.persistMessages(accountUuid, device, messages);
messageCount += messages.size();
if (messagesRemovedFromCache == 0) {
@@ -246,7 +252,7 @@ public class MessagePersister implements Managed {
.filter(d -> !d.isPrimary())
.flatMap(d ->
messagesManager
.getEarliestUndeliveredTimestampForDevice(account.getUuid(), d.getId())
.getEarliestUndeliveredTimestampForDevice(account.getUuid(), d)
.map(t -> Tuples.of(d, t)))
.sort(Comparator.comparing(Tuple2::getT2))
.map(Tuple2::getT1)

View File

@@ -11,6 +11,9 @@ import static io.micrometer.core.instrument.Metrics.timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.InvalidProtocolBufferException;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import java.nio.ByteBuffer;
import java.time.Duration;
@@ -26,12 +29,16 @@ import java.util.function.Predicate;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicMessagesConfiguration.DynamoKeyScheme;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.util.AttributeValues;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuples;
import reactor.util.function.Tuple2;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
@@ -62,46 +69,53 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
private final Timer storeTimer = timer(name(getClass(), "store"));
private final String DELETE_BY_ACCOUNT_TIMER_NAME = name(getClass(), "delete", "account");
private final String DELETE_BY_DEVICE_TIMER_NAME = name(getClass(), "delete", "device");
private final String MESSAGES_STORED_BY_SCHEME_COUNTER_NAME = name(getClass(), "messagesStored");
private final String MESSAGES_LOADED_BY_SCHEME_COUNTER_NAME = name(getClass(), "messagesLoaded");
private final String MESSAGES_DELETED_BY_SCHEME_COUNTER_NAME = name(getClass(), "messagesDeleted");
private final DynamoDbAsyncClient dbAsyncClient;
private final String tableName;
private final Duration timeToLive;
private final DynamicConfigurationManager<DynamicConfiguration> dynamicConfig;
private final ExecutorService messageDeletionExecutor;
private final Scheduler messageDeletionScheduler;
private static final Logger logger = LoggerFactory.getLogger(MessagesDynamoDb.class);
public MessagesDynamoDb(DynamoDbClient dynamoDb, DynamoDbAsyncClient dynamoDbAsyncClient, String tableName,
Duration timeToLive, ExecutorService messageDeletionExecutor) {
Duration timeToLive, DynamicConfigurationManager<DynamicConfiguration> dynamicConfig, ExecutorService messageDeletionExecutor) {
super(dynamoDb);
this.dbAsyncClient = dynamoDbAsyncClient;
this.tableName = tableName;
this.timeToLive = timeToLive;
this.dynamicConfig = dynamicConfig;
this.messageDeletionExecutor = messageDeletionExecutor;
this.messageDeletionScheduler = Schedulers.fromExecutor(messageDeletionExecutor);
}
public void store(final List<MessageProtos.Envelope> messages, final UUID destinationAccountUuid,
final byte destinationDeviceId) {
storeTimer.record(() -> writeInBatches(messages, (messageBatch) -> storeBatch(messageBatch, destinationAccountUuid, destinationDeviceId)));
final Device destinationDevice) {
storeTimer.record(() -> writeInBatches(messages, (messageBatch) -> storeBatch(messageBatch, destinationAccountUuid, destinationDevice)));
}
private void storeBatch(final List<MessageProtos.Envelope> messages, final UUID destinationAccountUuid,
final byte destinationDeviceId) {
final Device destinationDevice) {
final byte destinationDeviceId = destinationDevice.getId();
if (messages.size() > DYNAMO_DB_MAX_BATCH_SIZE) {
throw new IllegalArgumentException("Maximum batch size of " + DYNAMO_DB_MAX_BATCH_SIZE + " exceeded with " + messages.size() + " messages");
}
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid);
final DynamoKeyScheme scheme = dynamicConfig.getConfiguration().getMessagesConfiguration().writeKeyScheme();
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, destinationDevice, scheme);
List<WriteRequest> writeItems = new ArrayList<>();
for (MessageProtos.Envelope message : messages) {
final UUID messageUuid = UUID.fromString(message.getServerGuid());
final ImmutableMap.Builder<String, AttributeValue> item = ImmutableMap.<String, AttributeValue>builder()
.put(KEY_PARTITION, partitionKey)
.put(KEY_SORT, convertSortKey(destinationDeviceId, message.getServerTimestamp(), messageUuid))
.put(KEY_SORT, convertSortKey(destinationDevice.getId(), message.getServerTimestamp(), messageUuid, scheme))
.put(LOCAL_INDEX_MESSAGE_UUID_KEY_SORT, convertLocalIndexMessageUuidSortKey(messageUuid))
.put(KEY_TTL, AttributeValues.fromLong(getTtlForMessage(message)))
.put(KEY_ENVELOPE_BYTES, AttributeValue.builder().b(SdkBytes.fromByteArray(message.toByteArray())).build());
@@ -112,22 +126,43 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
}
executeTableWriteItemsUntilComplete(Map.of(tableName, writeItems));
Metrics.counter(MESSAGES_STORED_BY_SCHEME_COUNTER_NAME, Tags.of("scheme", scheme.name())).increment(writeItems.size());
}
public Publisher<MessageProtos.Envelope> load(final UUID destinationAccountUuid, final byte destinationDeviceId,
final Integer limit) {
public Publisher<MessageProtos.Envelope> load(final UUID destinationAccountUuid, final Device device, final Integer limit) {
return Flux.concat(
dynamicConfig.getConfiguration().getMessagesConfiguration().dynamoKeySchemes()
.stream()
.map(scheme -> load(destinationAccountUuid, device, limit, scheme))
.toList())
.map(messageAndScheme -> {
Metrics.counter(MESSAGES_LOADED_BY_SCHEME_COUNTER_NAME, Tags.of("scheme", messageAndScheme.getT2().name())).increment();
return messageAndScheme.getT1();
});
}
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid);
final QueryRequest.Builder queryRequestBuilder = QueryRequest.builder()
private Publisher<Tuple2<MessageProtos.Envelope, DynamoKeyScheme>> load(final UUID destinationAccountUuid, final Device device, final Integer limit, final DynamoKeyScheme scheme) {
final byte destinationDeviceId = device.getId();
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, device, scheme);
QueryRequest.Builder queryRequestBuilder = QueryRequest.builder()
.tableName(tableName)
.consistentRead(true)
.keyConditionExpression("#part = :part AND begins_with ( #sort , :sortprefix )")
.expressionAttributeNames(Map.of(
"#part", KEY_PARTITION,
"#sort", KEY_SORT))
.expressionAttributeValues(Map.of(
":part", partitionKey,
":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(destinationDeviceId)));
.consistentRead(true);
queryRequestBuilder = switch (scheme) {
case TRADITIONAL -> queryRequestBuilder
.keyConditionExpression("#part = :part AND begins_with ( #sort , :sortprefix )")
.expressionAttributeNames(Map.of(
"#part", KEY_PARTITION,
"#sort", KEY_SORT))
.expressionAttributeValues(Map.of(
":part", partitionKey,
":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(destinationDeviceId, scheme)));
case LAZY_DELETION -> queryRequestBuilder
.keyConditionExpression("#part = :part")
.expressionAttributeNames(Map.of("#part", KEY_PARTITION))
.expressionAttributeValues(Map.of(":part", partitionKey));
};
if (limit != null) {
// some callers dont take advantage of reactive streams, so we want to support limiting the fetch size. Otherwise,
@@ -146,13 +181,25 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
return null;
}
})
.filter(Predicate.not(Objects::isNull));
.filter(Predicate.not(Objects::isNull))
.map(m -> Tuples.of(m, scheme));
}
public CompletableFuture<Optional<MessageProtos.Envelope>> deleteMessageByDestinationAndGuid(
final UUID destinationAccountUuid, final UUID messageUuid) {
final UUID destinationAccountUuid, final Device destinationDevice, final UUID messageUuid) {
return dynamicConfig.getConfiguration().getMessagesConfiguration().dynamoKeySchemes()
.stream()
.map(scheme -> deleteMessageByDestinationAndGuid(destinationAccountUuid, destinationDevice, messageUuid, scheme))
// this combines the futures by producing a future that returns an arbitrary nonempty
// result if there is one, which should be OK because only one of the keying schemes
// should produce a nonempty result for any given message uuid
.reduce((f, g) -> f.thenCombine(g, (a, b) -> a.or(() -> b)))
.get(); // there is always at least one scheme
}
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid);
private CompletableFuture<Optional<MessageProtos.Envelope>> deleteMessageByDestinationAndGuid(
final UUID destinationAccountUuid, final Device destinationDevice, final UUID messageUuid, DynamoKeyScheme scheme) {
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, destinationDevice, scheme);
final QueryRequest queryRequest = QueryRequest.builder()
.tableName(tableName)
.indexName(LOCAL_INDEX_MESSAGE_UUID_NAME)
@@ -179,6 +226,7 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
.mapNotNull(deleteItemResponse -> {
try {
if (deleteItemResponse.attributes() != null && deleteItemResponse.attributes().containsKey(KEY_PARTITION)) {
Metrics.counter(MESSAGES_DELETED_BY_SCHEME_COUNTER_NAME, Tags.of("scheme", scheme.name())).increment();
return convertItemToEnvelope(deleteItemResponse.attributes());
}
} catch (final InvalidProtocolBufferException e) {
@@ -193,10 +241,21 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
}
public CompletableFuture<Optional<MessageProtos.Envelope>> deleteMessage(final UUID destinationAccountUuid,
final byte destinationDeviceId, final UUID messageUuid, final long serverTimestamp) {
final Device destinationDevice, final UUID messageUuid, final long serverTimestamp) {
return dynamicConfig.getConfiguration().getMessagesConfiguration().dynamoKeySchemes()
.stream()
.map(scheme -> deleteMessage(destinationAccountUuid, destinationDevice, messageUuid, serverTimestamp, scheme))
// this combines the futures by producing a future that returns an arbitrary nonempty
// result if there is one, which should be OK because only one of the keying schemes
// should produce a nonempty result for any given message uuid
.reduce((f, g) -> f.thenCombine(g, (a, b) -> a.or(() -> b)))
.orElseThrow(); // there is always at least one scheme
}
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid);
final AttributeValue sortKey = convertSortKey(destinationDeviceId, serverTimestamp, messageUuid);
private CompletableFuture<Optional<MessageProtos.Envelope>> deleteMessage(final UUID destinationAccountUuid,
final Device destinationDevice, final UUID messageUuid, final long serverTimestamp, final DynamoKeyScheme scheme) {
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, destinationDevice, scheme);
final AttributeValue sortKey = convertSortKey(destinationDevice.getId(), serverTimestamp, messageUuid, scheme);
DeleteItemRequest.Builder deleteItemRequest = DeleteItemRequest.builder()
.tableName(tableName)
.key(Map.of(KEY_PARTITION, partitionKey, KEY_SORT, sortKey))
@@ -216,10 +275,12 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
}, messageDeletionExecutor);
}
// Deletes all messages stored for the supplied account that were stored under the traditional (uuid+device id) keying scheme.
// Messages stored under the lazy-message-deletion keying scheme will not be affected.
public CompletableFuture<Void> deleteAllMessagesForAccount(final UUID destinationAccountUuid) {
final Timer.Sample sample = Timer.start();
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid);
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, null, DynamoKeyScheme.TRADITIONAL);
return Flux.from(dbAsyncClient.queryPaginator(QueryRequest.builder()
.tableName(tableName)
@@ -243,10 +304,13 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
.toFuture();
}
// Deletes all messages stored for the supplied account and device that were stored under the
// traditional (uuid+device id) keying scheme. Messages stored under the lazy-message-deletion
// keying scheme will not be affected.
public CompletableFuture<Void> deleteAllMessagesForDevice(final UUID destinationAccountUuid,
final byte destinationDeviceId) {
final Timer.Sample sample = Timer.start();
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid);
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, null, DynamoKeyScheme.TRADITIONAL);
return Flux.from(dbAsyncClient.queryPaginator(QueryRequest.builder()
.tableName(tableName)
@@ -256,7 +320,7 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
"#sort", KEY_SORT))
.expressionAttributeValues(Map.of(
":part", partitionKey,
":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(destinationDeviceId)))
":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(destinationDeviceId, DynamoKeyScheme.TRADITIONAL)))
.projectionExpression(KEY_SORT)
.consistentRead(true)
.build())
@@ -285,26 +349,38 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
return message.getServerTimestamp() / 1000 + timeToLive.getSeconds();
}
private static AttributeValue convertPartitionKey(final UUID destinationAccountUuid) {
return AttributeValues.fromUUID(destinationAccountUuid);
private static AttributeValue convertPartitionKey(final UUID destinationAccountUuid, final Device destinationDevice, final DynamoKeyScheme scheme) {
return switch (scheme) {
case TRADITIONAL -> AttributeValues.fromUUID(destinationAccountUuid);
case LAZY_DELETION -> {
final ByteBuffer byteBuffer = ByteBuffer.allocate(24);
byteBuffer.putLong(destinationAccountUuid.getMostSignificantBits());
byteBuffer.putLong(destinationAccountUuid.getLeastSignificantBits());
byteBuffer.putLong(destinationDevice.getCreated() & ~0x7f + destinationDevice.getId());
yield AttributeValues.fromByteBuffer(byteBuffer.flip());
}
};
}
private static AttributeValue convertSortKey(final byte destinationDeviceId, final long serverTimestamp,
final UUID messageUuid) {
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[32]);
// for compatibility - destinationDeviceId was previously `long`
byteBuffer.putLong(destinationDeviceId);
final UUID messageUuid, final DynamoKeyScheme scheme) {
final ByteBuffer byteBuffer = ByteBuffer.allocate(32);
if (scheme == DynamoKeyScheme.TRADITIONAL) {
// for compatibility - destinationDeviceId was previously `long`
byteBuffer.putLong(destinationDeviceId);
}
byteBuffer.putLong(serverTimestamp);
byteBuffer.putLong(messageUuid.getMostSignificantBits());
byteBuffer.putLong(messageUuid.getLeastSignificantBits());
return AttributeValues.fromByteBuffer(byteBuffer.flip());
}
private static AttributeValue convertDestinationDeviceIdToSortKeyPrefix(final byte destinationDeviceId) {
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[8]);
// for compatibility - destinationDeviceId was previously `long`
byteBuffer.putLong(destinationDeviceId);
return AttributeValues.fromByteBuffer(byteBuffer.flip());
private static AttributeValue convertDestinationDeviceIdToSortKeyPrefix(final byte destinationDeviceId, final DynamoKeyScheme scheme) {
return switch (scheme) {
case TRADITIONAL -> AttributeValues.fromByteBuffer(ByteBuffer.allocate(8).putLong(destinationDeviceId).flip());
case LAZY_DELETION -> AttributeValues.b(new byte[0]);
};
}
private static AttributeValue convertLocalIndexMessageUuidSortKey(final UUID messageUuid) {

View File

@@ -67,7 +67,7 @@ public class MessagesManager {
return messagesCache.hasMessages(destinationUuid, destinationDevice);
}
public Mono<Pair<List<Envelope>, Boolean>> getMessagesForDevice(UUID destinationUuid, byte destinationDevice,
public Mono<Pair<List<Envelope>, Boolean>> getMessagesForDevice(UUID destinationUuid, Device destinationDevice,
boolean cachedMessagesOnly) {
return Flux.from(
@@ -77,25 +77,25 @@ public class MessagesManager {
.map(envelopes -> new Pair<>(envelopes, envelopes.size() >= RESULT_SET_CHUNK_SIZE));
}
public Publisher<Envelope> getMessagesForDeviceReactive(UUID destinationUuid, byte destinationDevice,
public Publisher<Envelope> getMessagesForDeviceReactive(UUID destinationUuid, Device destinationDevice,
final boolean cachedMessagesOnly) {
return getMessagesForDevice(destinationUuid, destinationDevice, null, cachedMessagesOnly);
}
private Publisher<Envelope> getMessagesForDevice(UUID destinationUuid, byte destinationDevice,
private Publisher<Envelope> getMessagesForDevice(UUID destinationUuid, Device destinationDevice,
@Nullable Integer limit, final boolean cachedMessagesOnly) {
final Publisher<Envelope> dynamoPublisher =
cachedMessagesOnly ? Flux.empty() : messagesDynamoDb.load(destinationUuid, destinationDevice, limit);
final Publisher<Envelope> cachePublisher = messagesCache.get(destinationUuid, destinationDevice);
final Publisher<Envelope> cachePublisher = messagesCache.get(destinationUuid, destinationDevice.getId());
return Flux.concat(dynamoPublisher, cachePublisher)
.name(GET_MESSAGES_FOR_DEVICE_FLUX_NAME)
.tap(Micrometer.metrics(Metrics.globalRegistry));
}
public Mono<Long> getEarliestUndeliveredTimestampForDevice(UUID destinationUuid, byte destinationDevice) {
public Mono<Long> getEarliestUndeliveredTimestampForDevice(UUID destinationUuid, Device destinationDevice) {
return Mono.from(messagesDynamoDb.load(destinationUuid, destinationDevice, 1)).map(Envelope::getServerTimestamp);
}
@@ -111,9 +111,9 @@ public class MessagesManager {
messagesDynamoDb.deleteAllMessagesForDevice(destinationUuid, deviceId));
}
public CompletableFuture<Optional<Envelope>> delete(UUID destinationUuid, byte destinationDeviceId, UUID guid,
public CompletableFuture<Optional<Envelope>> delete(UUID destinationUuid, Device destinationDevice, UUID guid,
@Nullable Long serverTimestamp) {
return messagesCache.remove(destinationUuid, destinationDeviceId, guid)
return messagesCache.remove(destinationUuid, destinationDevice.getId(), guid)
.thenComposeAsync(removed -> {
if (removed.isPresent()) {
@@ -121,9 +121,9 @@ public class MessagesManager {
}
if (serverTimestamp == null) {
return messagesDynamoDb.deleteMessageByDestinationAndGuid(destinationUuid, guid);
return messagesDynamoDb.deleteMessageByDestinationAndGuid(destinationUuid, destinationDevice, guid);
} else {
return messagesDynamoDb.deleteMessage(destinationUuid, destinationDeviceId, guid, serverTimestamp);
return messagesDynamoDb.deleteMessage(destinationUuid, destinationDevice, guid, serverTimestamp);
}
}, messageDeletionExecutor);
@@ -134,20 +134,20 @@ public class MessagesManager {
*/
public int persistMessages(
final UUID destinationUuid,
final byte destinationDeviceId,
final Device destinationDevice,
final List<Envelope> messages) {
final List<Envelope> nonEphemeralMessages = messages.stream()
.filter(envelope -> !envelope.getEphemeral())
.collect(Collectors.toList());
messagesDynamoDb.store(nonEphemeralMessages, destinationUuid, destinationDeviceId);
messagesDynamoDb.store(nonEphemeralMessages, destinationUuid, destinationDevice);
final List<UUID> messageGuids = messages.stream().map(message -> UUID.fromString(message.getServerGuid()))
.collect(Collectors.toList());
int messagesRemovedFromCache = 0;
try {
messagesRemovedFromCache = messagesCache.remove(destinationUuid, destinationDeviceId, messageGuids)
messagesRemovedFromCache = messagesCache.remove(destinationUuid, destinationDevice.getId(), messageGuids)
.get(30, TimeUnit.SECONDS).size();
PERSIST_MESSAGE_COUNTER.increment(nonEphemeralMessages.size());

View File

@@ -228,7 +228,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
final CompletableFuture<Void> result;
if (isSuccessResponse(response)) {
result = messagesManager.delete(auth.getAccount().getUuid(), device.getId(),
result = messagesManager.delete(auth.getAccount().getUuid(), device,
storedMessageInfo.guid(), storedMessageInfo.serverTimestamp())
.thenApply(ignored -> null);
@@ -355,7 +355,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
private void sendMessages(final boolean cachedMessagesOnly, final CompletableFuture<Void> queueCleared) {
final Publisher<Envelope> messages =
messagesManager.getMessagesForDeviceReactive(auth.getAccount().getUuid(), device.getId(), cachedMessagesOnly);
messagesManager.getMessagesForDeviceReactive(auth.getAccount().getUuid(), device, cachedMessagesOnly);
final AtomicBoolean hasErrored = new AtomicBoolean();
@@ -414,7 +414,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
final UUID messageGuid = UUID.fromString(envelope.getServerGuid());
if (envelope.getStory() && !client.shouldDeliverStories()) {
messagesManager.delete(auth.getAccount().getUuid(), device.getId(), messageGuid, envelope.getServerTimestamp());
messagesManager.delete(auth.getAccount().getUuid(), device, messageGuid, envelope.getServerTimestamp());
return CompletableFuture.completedFuture(null);
} else {

View File

@@ -172,6 +172,7 @@ record CommandDependencies(
MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(dynamoDbClient, dynamoDbAsyncClient,
configuration.getDynamoDbTables().getMessages().getTableName(),
configuration.getDynamoDbTables().getMessages().getExpiration(),
dynamicConfigurationManager,
messageDeletionExecutor);
FaultTolerantRedisCluster messagesCluster = configuration.getMessageCacheConfiguration()
.getRedisClusterConfiguration().build("messages", redisClientResourcesBuilder);