diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisDynamoDbMessagePublisher.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisDynamoDbMessagePublisher.java index 0a91e984f..6223f7929 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisDynamoDbMessagePublisher.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisDynamoDbMessagePublisher.java @@ -244,6 +244,11 @@ class RedisDynamoDbMessagePublisher implements MessageAvailabilityListener, Flow return; } + if (messageSourceSubscriber != null) { + // We're still working through a previous message source; wait until it completes before starting a new one. + return; + } + if (storedMessageState == StoredMessageState.EMPTY) { // We don't think there are any messages in either message source; don't do anything until the situation changes // (when new messages arrive, we'll come back to this point with a non-empty stored message state) @@ -359,6 +364,7 @@ class RedisDynamoDbMessagePublisher implements MessageAvailabilityListener, Flow if (messageSourceSubscriber != null) { messageSourceSubscriber.dispose(); + messageSourceSubscriber = null; } // Stop receiving signals about new messages/conflicting consumers