diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java index 9830dade5..9db7dc655 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java @@ -168,7 +168,9 @@ public class MessagesCache { static final Duration MAX_EPHEMERAL_MESSAGE_DELAY = Duration.ofSeconds(10); private static final String GET_FLUX_NAME = MetricsUtil.name(MessagesCache.class, "get"); - private static final int PAGE_SIZE = 100; + + @VisibleForTesting + static final int PAGE_SIZE = 100; private static final int REMOVE_MRM_RECIPIENT_VIEW_CONCURRENCY = 8; diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java index 0b0f699d9..6beecbd94 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java @@ -272,6 +272,40 @@ class MessagesCacheTest { assertEquals(List.of(message2), get(DESTINATION_UUID, DESTINATION_DEVICE_ID, 1)); } + @Test + void testGetMessagesFirstPageDiscarded() { + final int discardableMessages = MessagesCache.PAGE_SIZE * 2; + final int deliverableMessages = MessagesCache.PAGE_SIZE + 1; + + final Instant now = Instant.now(); + final ServiceIdentifier destinationServiceIdentifier = new AciServiceIdentifier(DESTINATION_UUID); + + for (int i = 0; i < discardableMessages; i++) { + final UUID messageGuid = UUID.randomUUID(); + final long timestamp = now.minus(MessagesCache.MAX_EPHEMERAL_MESSAGE_DELAY.multipliedBy(2)).toEpochMilli() + i; + + final MessageProtos.Envelope message = + generateRandomMessage(messageGuid, destinationServiceIdentifier, true, true, timestamp); + + messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, message).join(); + } + + final List expectedMessages = new ArrayList<>(deliverableMessages); + + for (int i = 0; i < deliverableMessages; i++) { + final UUID messageGuid = UUID.randomUUID(); + final long timestamp = now.plusMillis(i).toEpochMilli(); + + final MessageProtos.Envelope message = + generateRandomMessage(messageGuid, destinationServiceIdentifier, true, false, timestamp); + + messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, message).join(); + expectedMessages.add(message); + } + + assertEquals(expectedMessages, get(DESTINATION_UUID, DESTINATION_DEVICE_ID, deliverableMessages + discardableMessages)); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) void testGetMessagesPublisher(final boolean expectStale) throws Exception { @@ -896,24 +930,30 @@ class MessagesCacheTest { } private MessageProtos.Envelope generateRandomMessage(final UUID messageGuid, final boolean sealedSender) { - return generateRandomMessage(messageGuid, new AciServiceIdentifier(UUID.randomUUID()), sealedSender, + return generateRandomMessage(messageGuid, new AciServiceIdentifier(UUID.randomUUID()), sealedSender, false, serialTimestamp++); } private MessageProtos.Envelope generateRandomMessage(final UUID messageGuid, - final ServiceIdentifier destinationServiceId, final boolean sealedSender) { - return generateRandomMessage(messageGuid, destinationServiceId, sealedSender, serialTimestamp++); + final ServiceIdentifier destinationServiceId, + final boolean sealedSender) { + + return generateRandomMessage(messageGuid, destinationServiceId, sealedSender, false, serialTimestamp++); } private MessageProtos.Envelope generateRandomMessage(final UUID messageGuid, - final ServiceIdentifier destinationServiceId, final boolean sealedSender, final long timestamp) { + final ServiceIdentifier destinationServiceId, + final boolean sealedSender, + final boolean ephemeral, + final long timestamp) { final MessageProtos.Envelope.Builder envelopeBuilder = MessageProtos.Envelope.newBuilder() .setClientTimestamp(timestamp) .setServerTimestamp(timestamp) .setContent(ByteString.copyFromUtf8(RandomStringUtils.secure().nextAlphanumeric(256))) .setType(MessageProtos.Envelope.Type.CIPHERTEXT) .setServerGuid(messageGuid.toString()) - .setDestinationServiceId(destinationServiceId.toServiceIdentifierString()); + .setDestinationServiceId(destinationServiceId.toServiceIdentifierString()) + .setEphemeral(ephemeral); if (!sealedSender) { envelopeBuilder.setSourceDevice(random.nextInt(Device.MAXIMUM_DEVICE_ID) + 1)