Restore "may have messages" methods

This commit is contained in:
Jon Chambers
2025-08-27 13:20:17 -04:00
committed by Jon Chambers
parent f57093a94a
commit ebdc5a30f8
4 changed files with 70 additions and 0 deletions

View File

@@ -300,6 +300,13 @@ public class MessagesCache {
}
public CompletableFuture<Boolean> hasMessagesAsync(final UUID destinationUuid, final byte destinationDevice) {
return redisCluster.withBinaryCluster(connection ->
connection.async().zcard(getMessageQueueKey(destinationUuid, destinationDevice))
.thenApply(cardinality -> cardinality > 0))
.toCompletableFuture();
}
public Publisher<MessageProtos.Envelope> get(final UUID destinationUuid, final byte destinationDevice) {
final long earliestAllowableEphemeralTimestamp =

View File

@@ -32,6 +32,7 @@ import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
import org.whispersystems.textsecuregcm.identity.IdentityType;
import org.whispersystems.textsecuregcm.identity.ServiceIdentifier;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.push.RedisMessageAvailabilityManager;
import org.whispersystems.textsecuregcm.util.Pair;
import reactor.core.observability.micrometer.Micrometer;
@@ -51,6 +52,9 @@ public class MessagesManager {
private static final Counter PERSIST_MESSAGE_BYTES_COUNTER = Metrics.counter(
name(MessagesManager.class, "persistMessageBytes"));
private static final String MAY_HAVE_MESSAGES_COUNTER_NAME =
MetricsUtil.name(MessagesManager.class, "mayHaveMessages");
private final MessagesDynamoDb messagesDynamoDb;
private final MessagesCache messagesCache;
private final RedisMessageAvailabilityManager redisMessageAvailabilityManager;
@@ -178,6 +182,28 @@ public class MessagesManager {
return messagesDynamoDb.mayHaveMessages(destinationUuid, destinationDevice);
}
public CompletableFuture<Boolean> mayHaveMessages(final UUID destinationUuid, final Device destinationDevice) {
return messagesCache.hasMessagesAsync(destinationUuid, destinationDevice.getId())
.thenCombine(messagesDynamoDb.mayHaveMessages(destinationUuid, destinationDevice),
(mayHaveCachedMessages, mayHavePersistedMessages) -> {
final String outcome;
if (mayHaveCachedMessages && mayHavePersistedMessages) {
outcome = "both";
} else if (mayHaveCachedMessages) {
outcome = "cached";
} else if (mayHavePersistedMessages) {
outcome = "persisted";
} else {
outcome = "none";
}
Metrics.counter(MAY_HAVE_MESSAGES_COUNTER_NAME, "outcome", outcome).increment();
return mayHaveCachedMessages || mayHavePersistedMessages;
});
}
public CompletableFuture<Boolean> mayHaveUrgentPersistedMessages(final UUID destinationUuid, final Device destinationDevice) {
return messagesDynamoDb.mayHaveUrgentMessages(destinationUuid, destinationDevice);
}