diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java index ce1963e4e..0e14961b5 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java @@ -5,8 +5,8 @@ package org.whispersystems.textsecuregcm.storage; -import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; import static io.micrometer.core.instrument.Metrics.timer; +import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; @@ -30,9 +30,6 @@ import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager; import org.whispersystems.textsecuregcm.util.AttributeValues; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; @@ -51,9 +48,6 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { @VisibleForTesting static final String KEY_SORT = "S"; - @VisibleForTesting - static final String LOCAL_INDEX_MESSAGE_UUID_NAME = "Message_UUID_Index"; - @VisibleForTesting static final String LOCAL_INDEX_MESSAGE_UUID_KEY_SORT = "U"; @@ -69,7 +63,6 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { private final String tableName; private final Duration timeToLive; private final ExecutorService messageDeletionExecutor; - private final Scheduler messageDeletionScheduler; private final ExperimentEnrollmentManager experimentEnrollmentManager; private static final Logger logger = LoggerFactory.getLogger(MessagesDynamoDb.class); @@ -84,7 +77,6 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { this.timeToLive = timeToLive; this.messageDeletionExecutor = messageDeletionExecutor; - this.messageDeletionScheduler = Schedulers.fromExecutor(messageDeletionExecutor); this.experimentEnrollmentManager = experimentEnrollmentManager; } @@ -164,48 +156,6 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { .filter(Predicate.not(Objects::isNull)); } - public CompletableFuture> deleteMessageByDestinationAndGuid( - final UUID destinationAccountUuid, final Device destinationDevice, final UUID messageUuid) { - final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, destinationDevice); - final QueryRequest queryRequest = QueryRequest.builder() - .tableName(tableName) - .indexName(LOCAL_INDEX_MESSAGE_UUID_NAME) - .projectionExpression(KEY_SORT) - .consistentRead(true) - .keyConditionExpression("#part = :part AND #uuid = :uuid") - .expressionAttributeNames(Map.of( - "#part", KEY_PARTITION, - "#uuid", LOCAL_INDEX_MESSAGE_UUID_KEY_SORT)) - .expressionAttributeValues(Map.of( - ":part", partitionKey, - ":uuid", convertLocalIndexMessageUuidSortKey(messageUuid))) - .build(); - - // because we are filtering on message UUID, this query should return at most one item, - // but it’s simpler to handle the full stream and return the “last” item - return Flux.from(dbAsyncClient.queryPaginator(queryRequest).items()) - .flatMap(item -> Mono.fromCompletionStage(dbAsyncClient.deleteItem(DeleteItemRequest.builder() - .tableName(tableName) - .key(Map.of(KEY_PARTITION, partitionKey, KEY_SORT, - AttributeValues.fromByteArray(item.get(KEY_SORT).b().asByteArray()))) - .returnValues(ReturnValue.ALL_OLD) - .build()))) - .mapNotNull(deleteItemResponse -> { - try { - if (deleteItemResponse.attributes() != null && deleteItemResponse.attributes().containsKey(KEY_PARTITION)) { - return convertItemToEnvelope(deleteItemResponse.attributes(), experimentEnrollmentManager); - } - } catch (final InvalidProtocolBufferException e) { - logger.error("Failed to parse envelope", e); - } - return null; - }) - .map(Optional::ofNullable) - .subscribeOn(messageDeletionScheduler) - .last(Optional.empty()) // if the flux is empty, last() will throw without a default - .toFuture(); - } - public CompletableFuture> deleteMessage(final UUID destinationAccountUuid, final Device destinationDevice, final UUID messageUuid, final long serverTimestamp) { DeleteItemRequest.Builder deleteItemRequest = DeleteItemRequest.builder() diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java index 1979783b6..7abf50104 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java @@ -28,7 +28,6 @@ import org.reactivestreams.Publisher; import org.signal.libsignal.protocol.SealedSenderMultiRecipientMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; import org.whispersystems.textsecuregcm.identity.IdentityType; import org.whispersystems.textsecuregcm.identity.ServiceIdentifier; diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamoDbExtensionSchema.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamoDbExtensionSchema.java index 4a77083b6..1eb58a3a4 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamoDbExtensionSchema.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/DynamoDbExtensionSchema.java @@ -8,8 +8,8 @@ package org.whispersystems.textsecuregcm.storage; import java.util.Collections; import java.util.List; import org.whispersystems.textsecuregcm.backup.BackupsDb; -import org.whispersystems.textsecuregcm.scheduler.JobScheduler; import org.whispersystems.textsecuregcm.experiment.PushNotificationExperimentSamples; +import org.whispersystems.textsecuregcm.scheduler.JobScheduler; import org.whispersystems.textsecuregcm.storage.devicecheck.AppleDeviceChecks; import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex; @@ -246,20 +246,8 @@ public final class DynamoDbExtensionSchema { MessagesDynamoDb.KEY_SORT, List.of( AttributeDefinition.builder().attributeName(MessagesDynamoDb.KEY_PARTITION).attributeType(ScalarAttributeType.B).build(), - AttributeDefinition.builder().attributeName(MessagesDynamoDb.KEY_SORT).attributeType(ScalarAttributeType.B).build(), - AttributeDefinition.builder().attributeName(MessagesDynamoDb.LOCAL_INDEX_MESSAGE_UUID_KEY_SORT) - .attributeType(ScalarAttributeType.B).build()), - List.of(), - List.of(LocalSecondaryIndex.builder() - .indexName(MessagesDynamoDb.LOCAL_INDEX_MESSAGE_UUID_NAME) - .keySchema( - KeySchemaElement.builder().attributeName(MessagesDynamoDb.KEY_PARTITION).keyType(KeyType.HASH).build(), - KeySchemaElement.builder() - .attributeName(MessagesDynamoDb.LOCAL_INDEX_MESSAGE_UUID_KEY_SORT) - .keyType(KeyType.RANGE) - .build()) - .projection(Projection.builder().projectionType(ProjectionType.KEYS_ONLY).build()) - .build())), + AttributeDefinition.builder().attributeName(MessagesDynamoDb.KEY_SORT).attributeType(ScalarAttributeType.B).build()), + List.of(), List.of()), ONETIME_DONATIONS("onetime_donations_test", OneTimeDonationsManager.KEY_PAYMENT_ID, diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java index cfe2a8d20..3b5b39ba8 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java @@ -11,9 +11,9 @@ import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyByte; import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.isNotNull; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -346,15 +346,16 @@ class MessagePersisterTest { .thenReturn(Flux.concat( Flux.fromIterable(persistedMessages), Flux.fromIterable(cachedMessages))); - when(messagesManager.delete(any(), any(), any(), any())) + when(messagesManager.delete(any(), any(), any(), anyLong())) .thenReturn(CompletableFuture.completedFuture(Optional.empty())); assertTimeoutPreemptively(Duration.ofSeconds(10), () -> messagePersister.persistNextQueues(Clock.systemUTC().instant())); verify(messagesManager, times(expectedClearedGuids.size())) - .delete(eq(DESTINATION_ACCOUNT_UUID), eq(primary), argThat(expectedClearedGuids::contains), isNotNull()); - verify(messagesManager, never()).delete(any(), any(), argThat(guid -> !expectedClearedGuids.contains(guid)), any()); + .delete(eq(DESTINATION_ACCOUNT_UUID), eq(primary), argThat(expectedClearedGuids::contains), anyLong()); + verify(messagesManager, never()) + .delete(any(), any(), argThat(guid -> !expectedClearedGuids.contains(guid)), anyLong()); final List queuesToPersist = messagesCache.getQueuesToPersist(SlotHash.getSlot(queueName), Clock.systemUTC().instant(), 1); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDbTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDbTest.java index 5f17c0437..c862824a3 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDbTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDbTest.java @@ -12,7 +12,6 @@ import com.google.protobuf.ByteString; import java.time.Duration; import java.util.ArrayList; import java.util.List; -import java.util.Optional; import java.util.Random; import java.util.UUID; import java.util.concurrent.ExecutorService; @@ -188,47 +187,6 @@ class MessagesDynamoDbTest { .verify(); } - @Test - void testDeleteMessageByDestinationAndGuid() throws Exception { - final UUID destinationUuid = UUID.randomUUID(); - final UUID secondDestinationUuid = UUID.randomUUID(); - final Device primary = DevicesHelper.createDevice((byte) 1); - final Device device2 = DevicesHelper.createDevice((byte) 2); - - messagesDynamoDb.store(List.of(MESSAGE1), destinationUuid, primary); - messagesDynamoDb.store(List.of(MESSAGE2), secondDestinationUuid, primary); - messagesDynamoDb.store(List.of(MESSAGE3), destinationUuid, device2); - - assertThat(load(destinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1) - .element(0).isEqualTo(MESSAGE1); - assertThat(load(destinationUuid, device2, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull() - .hasSize(1) - .element(0).isEqualTo(MESSAGE3); - assertThat(load(secondDestinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull() - .hasSize(1).element(0).isEqualTo(MESSAGE2); - - final Optional deletedMessage = messagesDynamoDb.deleteMessageByDestinationAndGuid( - secondDestinationUuid, primary, - UUID.fromString(MESSAGE2.getServerGuid())).get(5, TimeUnit.SECONDS); - - assertThat(deletedMessage).isPresent(); - - assertThat(load(destinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1) - .element(0).isEqualTo(MESSAGE1); - assertThat(load(destinationUuid, device2, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull() - .hasSize(1) - .element(0).isEqualTo(MESSAGE3); - assertThat(load(secondDestinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull() - .isEmpty(); - - final Optional alreadyDeletedMessage = messagesDynamoDb.deleteMessageByDestinationAndGuid( - secondDestinationUuid, primary, - UUID.fromString(MESSAGE2.getServerGuid())).get(5, TimeUnit.SECONDS); - - assertThat(alreadyDeletedMessage).isNotPresent(); - - } - @Test void testDeleteSingleMessage() throws Exception { final UUID destinationUuid = UUID.randomUUID(); @@ -274,19 +232,14 @@ class MessagesDynamoDbTest { final Device primary = DevicesHelper.createDevice((byte) 1); primary.setCreated(System.currentTimeMillis()); - messagesDynamoDb.store(List.of(MESSAGE1, MESSAGE2, MESSAGE3), destinationUuid, primary); + messagesDynamoDb.store(List.of(MESSAGE1, MESSAGE2), destinationUuid, primary); assertThat(load(destinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)) - .as("load should return all messages stored").containsOnly(MESSAGE1, MESSAGE2, MESSAGE3); + .as("load should return all messages stored").containsOnly(MESSAGE1, MESSAGE2); - messagesDynamoDb.deleteMessageByDestinationAndGuid(destinationUuid, primary, UUID.fromString(MESSAGE1.getServerGuid())) + messagesDynamoDb.deleteMessage(destinationUuid, primary, UUID.fromString(MESSAGE1.getServerGuid()), MESSAGE1.getServerTimestamp()) .get(1, TimeUnit.SECONDS); assertThat(load(destinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)) - .as("deleting message by guid should work").containsExactly(MESSAGE3, MESSAGE2); - - messagesDynamoDb.deleteMessage(destinationUuid, primary, UUID.fromString(MESSAGE2.getServerGuid()), MESSAGE2.getServerTimestamp()) - .get(1, TimeUnit.SECONDS); - assertThat(load(destinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)) - .as("deleting message by guid and timestamp should work").containsExactly(MESSAGE3); + .as("deleting message by guid and timestamp should work").containsExactly(MESSAGE2); primary.setCreated(primary.getCreated() + 1000); assertThat(load(destinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE))