Add methods to test whether a device may have unread messages

This commit is contained in:
Jon Chambers
2024-06-24 15:19:48 -04:00
committed by GitHub
parent fa1281ae86
commit 9b7af00cf5
6 changed files with 119 additions and 0 deletions

View File

@@ -213,6 +213,13 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
connection -> connection.sync().zcard(getMessageQueueKey(destinationUuid, destinationDevice)) > 0);
}
public CompletableFuture<Boolean> hasMessagesAsync(final UUID destinationUuid, final byte destinationDevice) {
return redisCluster.withBinaryCluster(connection ->
connection.async().zcard(getMessageQueueKey(destinationUuid, destinationDevice))
.thenApply(cardinality -> cardinality > 0))
.toCompletableFuture();
}
public Publisher<MessageProtos.Envelope> get(final UUID destinationUuid, final byte destinationDevice) {
final long earliestAllowableEphemeralTimestamp =

View File

@@ -129,6 +129,40 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
Metrics.counter(MESSAGES_STORED_BY_SCHEME_COUNTER_NAME, Tags.of("scheme", scheme.name())).increment(writeItems.size());
}
public CompletableFuture<Boolean> mayHaveMessages(final UUID accountIdentifier, final Device device) {
return Flux.fromIterable(dynamicConfig.getConfiguration().getMessagesConfiguration().dynamoKeySchemes())
.flatMap(scheme -> mayHaveMessages(accountIdentifier, device, scheme))
.any(mayHaveMessages -> mayHaveMessages)
.toFuture();
}
private Mono<Boolean> mayHaveMessages(final UUID accountIdentifier, final Device device, final DynamoKeyScheme scheme) {
final AttributeValue partitionKey = convertPartitionKey(accountIdentifier, device, scheme);
QueryRequest.Builder queryRequestBuilder = QueryRequest.builder()
.tableName(tableName)
.consistentRead(false)
.limit(1);
queryRequestBuilder = switch (scheme) {
case TRADITIONAL -> queryRequestBuilder
.keyConditionExpression("#part = :part AND begins_with ( #sort , :sortprefix )")
.expressionAttributeNames(Map.of(
"#part", KEY_PARTITION,
"#sort", KEY_SORT))
.expressionAttributeValues(Map.of(
":part", partitionKey,
":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(device.getId(), scheme)));
case LAZY_DELETION -> queryRequestBuilder
.keyConditionExpression("#part = :part")
.expressionAttributeNames(Map.of("#part", KEY_PARTITION))
.expressionAttributeValues(Map.of(":part", partitionKey));
};
return Mono.fromFuture(dbAsyncClient.query(queryRequestBuilder.build())
.thenApply(queryResponse -> queryResponse.count() > 0));
}
public Publisher<MessageProtos.Envelope> load(final UUID destinationAccountUuid, final Device device, final Integer limit) {
return Flux.concat(
dynamicConfig.getConfiguration().getMessagesConfiguration().dynamoKeySchemes()

View File

@@ -63,6 +63,13 @@ public class MessagesManager {
}
}
public CompletableFuture<Boolean> mayHaveMessages(final UUID destinationUuid, final Device destinationDevice) {
return messagesCache.hasMessagesAsync(destinationUuid, destinationDevice.getId())
.thenCompose(hasMessages -> hasMessages
? CompletableFuture.completedFuture(true)
: messagesDynamoDb.mayHaveMessages(destinationUuid, destinationDevice));
}
public boolean hasCachedMessages(final UUID destinationUuid, final byte destinationDevice) {
return messagesCache.hasMessages(destinationUuid, destinationDevice);
}