Wait for message acknowledgement before fetching new messags from Redis/DynamoDB

This commit is contained in:
Jon Chambers
2025-08-14 12:07:40 -04:00
committed by Jon Chambers
parent 194e43926a
commit 8fe87b77e4
7 changed files with 256 additions and 160 deletions

View File

@@ -42,9 +42,21 @@ class RedisDynamoDbMessagePublisher implements MessageAvailabilityListener, Flow
// and Redis.
private StoredMessageState storedMessageState = StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE;
// The number of messages the downstream subscriber is ready to receive. This changes in response to new requests from
// the downstream subscriber and gets decremented every time we publish a message.
private long unmetDemand = 0;
// Indicates whether we've sent an "initial queue drain complete" signal to subscribers. This state will transition
// from "not ready" to "pending" as soon as the initial message source subscription completes, and then from "pending"
// to "send" when the signal has been sent. This state will never roll backwards.
private QueueEmptySignalState queueEmptySignalState = QueueEmptySignalState.NOT_READY;
// The total requested demand from the subscriber across the whole lifetime of this publisher
private long requestedDemand = 0;
// The total number of signals (new messages or "queue empty" signals) published during the lifetime of this
// publisher. Must not exceed `requestedDemand`.
private long publishedEntries = 0;
// The total number of published signals that have been acknowledged by the subscriber; to avoid re-reading messages,
// we should never start a new message source subscriber until `acknowledgedEntries` is equal to `publishedEntries`
private long acknowledgedEntries = 0;
// Although technically nullable, operation of this publisher really begins once we get a subscriber. This publisher
// supports only a single subscriber.
@@ -54,15 +66,6 @@ class RedisDynamoDbMessagePublisher implements MessageAvailabilityListener, Flow
// terminated, a publisher cannot be un-terminated.
private boolean terminated = false;
// This publisher will emit exactly one "queue empty" signal once the initial contents of the message queue have been
// drained. Once emitted, this flag is set to `true` and will never change again.
private boolean publishedQueueEmptySignal = false;
// …but we may not be able to send the "queue empty" signal downstream immediately if there's no demand. This flag
// tracks whether we're ready to publish a "queue empty" signal, regardless of whether we've actually sent it. Once
// this flag is set to `true`, it will never change again.
private boolean readyToPublishQueueEmptySignal = false;
// A message source subscriber subscribes to messages from upstream data sources (i.e. DynamoDB and Redis), and this
// publisher relays signals the message source subscriber to the downstream subscriber. The message source subscriber
// may be null if we're not actively fetching messages from an upstream source and changes every time an upstream
@@ -83,6 +86,18 @@ class RedisDynamoDbMessagePublisher implements MessageAvailabilityListener, Flow
EMPTY
}
private enum QueueEmptySignalState {
// Indicates that we are not yet ready to send the "initial queue drain complete" signal regardless of outstanding
// demand
NOT_READY,
// Indicates that we are ready to send the "queue empty" signal as soon as demand is available
PENDING,
// Indicates that we have sent the "queue empty" signal and must never send it again
SENT
}
/// A message source subscriber subscribes to upstream message source publishers and relays signals to the downstream
/// subscriber via the parent `RedisDynamoDbMessagePublisher`.
private static class MessageSourceSubscriber extends BaseSubscriber<MessageProtos.Envelope> {
@@ -95,12 +110,7 @@ class RedisDynamoDbMessagePublisher implements MessageAvailabilityListener, Flow
@Override
protected void hookOnSubscribe(final Subscription subscription) {
final long unmetDemand = redisDynamoDbMessagePublisher.getUnmetDemand();
// If we already have some unmet demand, pass that on to the upstream publisher immediately on subscribing
if (unmetDemand > 0) {
subscription.request(unmetDemand);
}
redisDynamoDbMessagePublisher.handleMessageSourceSubscribed(subscription);
}
@Override
@@ -188,10 +198,16 @@ class RedisDynamoDbMessagePublisher implements MessageAvailabilityListener, Flow
assert subscriber != null;
if (!terminated) {
terminate();
subscriber.onError(new ConflictingMessageConsumerException());
}
}
terminate();
synchronized void handleMessageAcknowledged() {
acknowledgedEntries += 1;
assert acknowledgedEntries <= publishedEntries;
maybeGenerateMessageSource();
}
private synchronized boolean maybeSendQueueEmptySignal() {
@@ -203,13 +219,18 @@ class RedisDynamoDbMessagePublisher implements MessageAvailabilityListener, Flow
// The machinery that produces messages won't activate until we have a subscriber
assert subscriber != null;
if (readyToPublishQueueEmptySignal && !publishedQueueEmptySignal && getUnmetDemand() > 0) {
if (queueEmptySignalState == QueueEmptySignalState.PENDING && publishedEntries < requestedDemand) {
queueEmptySignalState = QueueEmptySignalState.SENT;
publishedEntries += 1;
// Subscribers don't explicitly acknowledge "queue empty" signals, and we can consider them automatically
// acknowledged
acknowledgedEntries += 1;
subscriber.onNext(new MessageStreamEntry.QueueEmpty());
unmetDemand -= 1;
assert unmetDemand >= 0;
publishedQueueEmptySignal = true;
assert publishedEntries <= requestedDemand;
assert acknowledgedEntries <= publishedEntries;
return true;
}
@@ -218,14 +239,27 @@ class RedisDynamoDbMessagePublisher implements MessageAvailabilityListener, Flow
}
private synchronized void maybeGenerateMessageSource() {
// Regardless of any other state, don't do anything if terminated
if (terminated) {
// Regardless of any other state, don't do anything if terminated
return;
}
if (storedMessageState == StoredMessageState.EMPTY || unmetDemand == 0) {
// We don't think there are any messages in either source or there's no demand for messages; either way, wait for
// things to change before trying to generate a message source
if (storedMessageState == StoredMessageState.EMPTY) {
// We don't think there are any messages in either message source; don't do anything until the situation changes
// (when new messages arrive, we'll come back to this point with a non-empty stored message state)
return;
}
if (publishedEntries == requestedDemand) {
// Even if there are messages available, there's no demand for them yet (when there's new demand, we'll come back
// to this point with a higher value for `requestedDemand` via `addDemand`)
return;
}
if (acknowledgedEntries < publishedEntries) {
// To avoid double-reading messages from data stores that don't support cursors, don't get a new message source
// unless all previously-published signals have been acknowledged (when messages are acknowledged, we'll come back
// to this point with a higher value for `acknowledgedEntries` via `handleMessageAcknowledged`)
return;
}
@@ -249,14 +283,25 @@ class RedisDynamoDbMessagePublisher implements MessageAvailabilityListener, Flow
storedMessageState = StoredMessageState.EMPTY;
}
private synchronized void handleMessageSourceSubscribed(final Subscription subscription) {
if (!terminated) {
// If we already have some unmet demand, pass that on to the upstream publisher immediately on subscribing
if (requestedDemand > publishedEntries) {
subscription.request(requestedDemand - publishedEntries);
}
}
}
private synchronized void handleNextMessage(final MessageProtos.Envelope message) {
// The machinery that produces messages won't activate until we have a subscriber
assert subscriber != null;
if (!terminated) {
unmetDemand -= 1;
assert unmetDemand >= 0;
// We only pass along unfulfilled demand to the message source subscriber, so if the message source subscriber
// emits a new signal, it should fit within the unfulfilled demand from the downstream subscriber
assert publishedEntries < requestedDemand;
publishedEntries += 1;
subscriber.onNext(new MessageStreamEntry.Envelope(message));
}
}
@@ -267,9 +312,10 @@ class RedisDynamoDbMessagePublisher implements MessageAvailabilityListener, Flow
messageSourceSubscriber = null;
// Attempt to send a "queue empty" signal if we haven't already
readyToPublishQueueEmptySignal = true;
maybeSendQueueEmptySignal();
if (queueEmptySignalState == QueueEmptySignalState.NOT_READY) {
queueEmptySignalState = QueueEmptySignalState.PENDING;
maybeSendQueueEmptySignal();
}
// New messages may have arrived already; fetch them if possible
maybeGenerateMessageSource();
@@ -280,8 +326,8 @@ class RedisDynamoDbMessagePublisher implements MessageAvailabilityListener, Flow
assert subscriber != null;
if (!terminated) {
subscriber.onError(throwable);
terminate();
subscriber.onError(throwable);
}
}
@@ -290,9 +336,8 @@ class RedisDynamoDbMessagePublisher implements MessageAvailabilityListener, Flow
throw new IllegalArgumentException("Demand must be positive");
}
unmetDemand += demand;
requestedDemand += demand;
// We may have been waiting for non-zero demand before sending a "queue empty" signal
final boolean sentQueueEmptySignal = maybeSendQueueEmptySignal();
// This is a little tricky; if we already have a subscriber, we only want to request NEW demand, not the total
@@ -308,20 +353,16 @@ class RedisDynamoDbMessagePublisher implements MessageAvailabilityListener, Flow
}
}
private synchronized long getUnmetDemand() {
return unmetDemand;
}
private synchronized void terminate() {
if (!terminated) {
terminated = true;
// Stop receiving signals about new messages/conflicting consumers
redisMessageAvailabilityManager.handleClientDisconnected(accountIdentifier, device.getId());
if (messageSourceSubscriber != null) {
messageSourceSubscriber.dispose();
}
// Stop receiving signals about new messages/conflicting consumers
redisMessageAvailabilityManager.handleClientDisconnected(accountIdentifier, device.getId());
}
}
}

View File

@@ -8,6 +8,7 @@ package org.whispersystems.textsecuregcm.storage;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import com.google.common.annotations.VisibleForTesting;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.push.RedisMessageAvailabilityManager;
import org.whispersystems.textsecuregcm.util.Util;
@@ -29,16 +30,25 @@ public class RedisDynamoDbMessageStream implements MessageStream {
final UUID accountIdentifier,
final Device device) {
this(messagesDynamoDb, messagesCache, accountIdentifier, device, new RedisDynamoDbMessagePublisher(messagesDynamoDb,
messagesCache,
redisMessageAvailabilityManager,
accountIdentifier,
device));
}
@VisibleForTesting
RedisDynamoDbMessageStream(final MessagesDynamoDb messagesDynamoDb,
final MessagesCache messagesCache,
final UUID accountIdentifier,
final Device device,
final RedisDynamoDbMessagePublisher messagePublisher) {
this.messagesDynamoDb = messagesDynamoDb;
this.messagesCache = messagesCache;
this.accountIdentifier = accountIdentifier;
this.device = device;
this.messagePublisher = new RedisDynamoDbMessagePublisher(messagesDynamoDb,
messagesCache,
redisMessageAvailabilityManager,
accountIdentifier,
device);
this.messagePublisher = messagePublisher;
}
@Override
@@ -54,6 +64,7 @@ public class RedisDynamoDbMessageStream implements MessageStream {
.thenCompose(removed -> removed.map(_ -> CompletableFuture.<Void>completedFuture(null))
.orElseGet(() ->
messagesDynamoDb.deleteMessage(accountIdentifier, device, guid, message.getServerTimestamp())
.thenRun(Util.NOOP)));
.thenRun(Util.NOOP)))
.whenComplete((_, _) -> messagePublisher.handleMessageAcknowledged());
}
}

View File

@@ -56,7 +56,6 @@ import reactor.core.Disposable;
import reactor.core.observability.micrometer.Micrometer;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.util.retry.Retry;
public class WebSocketConnection implements DisconnectionRequestListener {
@@ -103,8 +102,6 @@ public class WebSocketConnection implements DisconnectionRequestListener {
private final MessageDeliveryLoopMonitor messageDeliveryLoopMonitor;
private final ExperimentEnrollmentManager experimentEnrollmentManager;
private final Retry retrySpec;
private final Account authenticatedAccount;
private final Device authenticatedDevice;
private final MessageStream messageStream;
@@ -131,38 +128,6 @@ public class WebSocketConnection implements DisconnectionRequestListener {
final MessageDeliveryLoopMonitor messageDeliveryLoopMonitor,
final ExperimentEnrollmentManager experimentEnrollmentManager) {
this(receiptSender,
messagesManager,
messageMetrics,
pushNotificationManager,
pushNotificationScheduler,
authenticatedAccount,
authenticatedDevice,
client,
messageDeliveryScheduler,
clientReleaseManager,
messageDeliveryLoopMonitor,
experimentEnrollmentManager,
Retry.backoff(4, Duration.ofSeconds(1))
.maxBackoff(Duration.ofSeconds(2))
.filter(throwable -> !isConnectionClosedException(throwable)));
}
@VisibleForTesting
WebSocketConnection(final ReceiptSender receiptSender,
final MessagesManager messagesManager,
final MessageMetrics messageMetrics,
final PushNotificationManager pushNotificationManager,
final PushNotificationScheduler pushNotificationScheduler,
final Account authenticatedAccount,
final Device authenticatedDevice,
final WebSocketClient client,
final Scheduler messageDeliveryScheduler,
final ClientReleaseManager clientReleaseManager,
final MessageDeliveryLoopMonitor messageDeliveryLoopMonitor,
final ExperimentEnrollmentManager experimentEnrollmentManager,
final Retry retrySpec) {
this.receiptSender = receiptSender;
this.messagesManager = messagesManager;
this.messageMetrics = messageMetrics;
@@ -176,8 +141,6 @@ public class WebSocketConnection implements DisconnectionRequestListener {
this.messageDeliveryLoopMonitor = messageDeliveryLoopMonitor;
this.experimentEnrollmentManager = experimentEnrollmentManager;
this.retrySpec = retrySpec;
this.messageStream =
messagesManager.getMessages(authenticatedAccount.getIdentifier(IdentityType.ACI), authenticatedDevice);
@@ -214,9 +177,7 @@ public class WebSocketConnection implements DisconnectionRequestListener {
}
})
.flatMapSequential(entry -> switch (entry) {
case MessageStreamEntry.Envelope envelope -> Mono.fromFuture(() -> sendMessage(envelope.message()))
.retryWhen(retrySpec)
.thenReturn(entry);
case MessageStreamEntry.Envelope envelope -> Mono.fromFuture(() -> sendMessage(envelope.message())).thenReturn(entry);
case MessageStreamEntry.QueueEmpty _ -> Mono.just(entry);
}, MESSAGE_SENDER_MAX_CONCURRENCY)
.subscribeOn(messageDeliveryScheduler)