diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 6859b3e2a..ea6874c01 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -633,8 +633,10 @@ public class WhisperServerService extends Application displacedListener.get().handleConflictingMessageReader()); + listenerEventExecutor.execute(() -> displacedListener.get().handleConflictingMessageConsumer()); } return subscribeFuture.get() @@ -318,7 +318,7 @@ public class RedisMessageAvailabilityManager extends RedisClusterPubSubAdapter getMessages(); + + /// Acknowledges receipt of the given message. Implementations may delete the message immediately or defer deletion for + /// inclusion in a more efficient batch deletion. + /// + /// @param message the message to acknowledge + /// + /// @return a future that completes when the message stream has processed the acknowledgement + CompletableFuture acknowledgeMessage(MessageProtos.Envelope message); +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessageStreamEntry.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessageStreamEntry.java new file mode 100644 index 000000000..83df74d0d --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessageStreamEntry.java @@ -0,0 +1,26 @@ +/* + * Copyright 2025 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.storage; + +import org.whispersystems.textsecuregcm.entities.MessageProtos; + +/// A `MessageStreamEntr` is an entity that can be emitted by the publisher returned by [MessageStream#getMessages()]. +/// Message stream entries either produce an individual message (see [Envelope]) or that the initial contents of a +/// message queue have been drained (see [QueueEmpty]). +public sealed interface MessageStreamEntry permits MessageStreamEntry.Envelope, MessageStreamEntry.QueueEmpty { + + /// A message stream entry that carries a single message. + /// + /// @param message the message emitted by the publisher + record Envelope(MessageProtos.Envelope message) implements MessageStreamEntry { + } + + /// A message stream entry that indicates that the initial contents of a message queue have been emitted by the + /// publisher; any [Envelope] entries after a `QueueEmpty` entry arrived after caller started reading + /// messages from the queue. + record QueueEmpty() implements MessageStreamEntry { + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java index 45db80882..3f049a6fb 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java @@ -57,6 +57,7 @@ public class MessagesManager { private final MessagesDynamoDb messagesDynamoDb; private final MessagesCache messagesCache; + private final RedisMessageAvailabilityManager redisMessageAvailabilityManager; private final ReportMessageManager reportMessageManager; private final ExecutorService messageDeletionExecutor; private final Clock clock; @@ -64,12 +65,14 @@ public class MessagesManager { public MessagesManager( final MessagesDynamoDb messagesDynamoDb, final MessagesCache messagesCache, + final RedisMessageAvailabilityManager redisMessageAvailabilityManager, final ReportMessageManager reportMessageManager, final ExecutorService messageDeletionExecutor, final Clock clock) { this.messagesDynamoDb = messagesDynamoDb; this.messagesCache = messagesCache; + this.redisMessageAvailabilityManager = redisMessageAvailabilityManager; this.reportMessageManager = reportMessageManager; this.messageDeletionExecutor = messageDeletionExecutor; this.clock = clock; @@ -221,6 +224,10 @@ public class MessagesManager { return getMessagesForDevice(destinationUuid, destinationDevice, null, cachedMessagesOnly); } + public MessageStream getMessages(final UUID destinationUuid, final Device destinationDevice) { + return new RedisDynamoDbMessageStream(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager, destinationUuid, destinationDevice); + } + private Publisher getMessagesForDevice(UUID destinationUuid, Device destinationDevice, @Nullable Integer limit, final boolean cachedMessagesOnly) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisDynamoDbMessagePublisher.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisDynamoDbMessagePublisher.java new file mode 100644 index 000000000..485c4e8cc --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisDynamoDbMessagePublisher.java @@ -0,0 +1,327 @@ +/* + * Copyright 2025 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.storage; + +import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; + +import io.micrometer.core.instrument.Metrics; +import java.util.UUID; +import java.util.concurrent.Flow; +import javax.annotation.Nullable; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscription; +import org.whispersystems.textsecuregcm.entities.MessageProtos; +import org.whispersystems.textsecuregcm.push.MessageAvailabilityListener; +import org.whispersystems.textsecuregcm.push.RedisMessageAvailabilityManager; +import reactor.core.observability.micrometer.Micrometer; +import reactor.core.publisher.BaseSubscriber; +import reactor.core.publisher.Flux; + +/// A Redis/DynamoDB message publisher produces a non-terminating stream of messages for a specific device. It listens +/// for message availability signals from [RedisMessageAvailabilityManager] and emits new messages to its subscriber +/// when available. +/// +/// This publisher supports only a single subscriber. It assumes that subscribers acknowledge (delete) messages as they +/// read the messages, and may emit duplicate messages if subscribers do not acknowledge messages before requesting more +/// messages. +class RedisDynamoDbMessagePublisher implements MessageAvailabilityListener, Flow.Publisher { + + private final MessagesDynamoDb messagesDynamoDb; + private final MessagesCache messagesCache; + private final RedisMessageAvailabilityManager redisMessageAvailabilityManager; + + private final UUID accountIdentifier; + private final Device device; + + // Indicates which data source(s) we think might contain messages for the destination device. Messages initially land + // in Redis, but are eventually "persisted" to DynamoDB. This state changes in response to signals this publisher + // receives as a MessageAvailabilityListener. As an initial state, we assume that we have messages in both DynamoDB + // and Redis. + private StoredMessageState storedMessageState = StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE; + + // The number of messages the downstream subscriber is ready to receive. This changes in response to new requests from + // the downstream subscriber and gets decremented every time we publish a message. + private long unmetDemand = 0; + + // Although technically nullable, operation of this publisher really begins once we get a subscriber. This publisher + // supports only a single subscriber. + @Nullable private Flow.Subscriber subscriber; + + // If terminated (i.e. by an error or by downstream cancellation), this publisher will stop emitting signals. Once + // terminated, a publisher cannot be un-terminated. + private boolean terminated = false; + + // This publisher will emit exactly one "queue empty" signal once the initial contents of the message queue have been + // drained. Once emitted, this flag is set to `true` and will never change again. + private boolean publishedQueueEmptySignal = false; + + // …but we may not be able to send the "queue empty" signal downstream immediately if there's no demand. This flag + // tracks whether we're ready to publish a "queue empty" signal, regardless of whether we've actually sent it. Once + // this flag is set to `true`, it will never change again. + private boolean readyToPublishQueueEmptySignal = false; + + // A message source subscriber subscribes to messages from upstream data sources (i.e. DynamoDB and Redis), and this + // publisher relays signals the message source subscriber to the downstream subscriber. The message source subscriber + // may be null if we're not actively fetching messages from an upstream source and changes every time an upstream + // publisher completes. + @Nullable private MessageSourceSubscriber messageSourceSubscriber; + + private static final String GET_MESSAGES_FOR_DEVICE_FLUX_NAME = + name(RedisDynamoDbMessagePublisher.class, "getMessagesForDevice"); + + private enum StoredMessageState { + // Indicates that stored messages are available in at least DynamoDB and possibly also Redis + PERSISTED_NEW_MESSAGES_AVAILABLE, + + // Indicates that messages are available in Redis, but have not yet been persisted to DynamoDB + CACHED_NEW_MESSAGES_AVAILABLE, + + // Indicates that no new messages are available in either Redis or DynamoDB + EMPTY + } + + /// A message source subscriber subscribes to upstream message source publishers and relays signals to the downstream + /// subscriber via the parent `RedisDynamoDbMessagePublisher`. + private static class MessageSourceSubscriber extends BaseSubscriber { + + private final RedisDynamoDbMessagePublisher redisDynamoDbMessagePublisher; + + private MessageSourceSubscriber(RedisDynamoDbMessagePublisher redisDynamoDbMessagePublisher) { + this.redisDynamoDbMessagePublisher = redisDynamoDbMessagePublisher; + } + + @Override + protected void hookOnSubscribe(final Subscription subscription) { + final long unmetDemand = redisDynamoDbMessagePublisher.getUnmetDemand(); + + // If we already have some unmet demand, pass that on to the upstream publisher immediately on subscribing + if (unmetDemand > 0) { + subscription.request(unmetDemand); + } + } + + @Override + protected void hookOnNext(final MessageProtos.Envelope message) { + redisDynamoDbMessagePublisher.handleNextMessage(message); + } + + @Override + protected void hookOnComplete() { + redisDynamoDbMessagePublisher.handleMessageSourceComplete(); + } + + @Override + protected void hookOnError(final Throwable throwable) { + redisDynamoDbMessagePublisher.handleMessageSourceError(throwable); + } + } + + RedisDynamoDbMessagePublisher(final MessagesDynamoDb messagesDynamoDb, + final MessagesCache messagesCache, + final RedisMessageAvailabilityManager redisMessageAvailabilityManager, + final UUID accountIdentifier, + final Device device) { + + this.messagesDynamoDb = messagesDynamoDb; + this.messagesCache = messagesCache; + this.redisMessageAvailabilityManager = redisMessageAvailabilityManager; + this.accountIdentifier = accountIdentifier; + this.device = device; + } + + @Override + public synchronized void subscribe(final Flow.Subscriber subscriber) { + if (this.subscriber != null) { + subscriber.onError(new IllegalStateException("Redis/DynamoDB message publisher only allows one subscriber")); + return; + } + + this.subscriber = subscriber; + + // Listen for signals indicating that new messages are available in Redis, that messages have been persisted from + // Redis to DynamoDB, or that there's a conflicting message reader connected somewhere else + redisMessageAvailabilityManager.handleClientConnected(accountIdentifier, device.getId(), this); + + subscriber.onSubscribe(new Flow.Subscription() { + @Override + public void request(final long n) { + addDemand(n); + } + + @Override + public void cancel() { + terminate(); + } + }); + } + + @Override + public synchronized void handleNewMessageAvailable() { + // We only need to take action if we think there aren't already messages to pass downstream. Any other stored + // message state implies that we're either actively sending messages downstream or we're waiting for demand from the + // downstream subscriber and don't need to take any action now. We'll call `maybeGenerateMessageSource` either when + // we receive a request for more messages or when the current upstream publisher completes. + if (storedMessageState == StoredMessageState.EMPTY) { + storedMessageState = StoredMessageState.CACHED_NEW_MESSAGES_AVAILABLE; + maybeGenerateMessageSource(); + } + } + + @Override + public synchronized void handleMessagesPersisted() { + // We only need to take action if we think there aren't already messages in DynamoDB. If we're already aware of + // messages in DynamoDB, then we're either actively sending messages downstream or we're waiting for demand from the + // downstream subscriber and don't need to take any action now. We'll call `maybeGenerateMessageSource` either when + // we receive a request for more messages or when the current upstream publisher completes. + if (storedMessageState != StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE) { + storedMessageState = StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE; + maybeGenerateMessageSource(); + } + } + + @Override + public synchronized void handleConflictingMessageConsumer() { + // We don't register as a listener for conflicting consumer signals until we have a subscriber + assert subscriber != null; + + if (!terminated) { + subscriber.onError(new ConflictingMessageConsumerException()); + } + + terminate(); + } + + private synchronized boolean maybeSendQueueEmptySignal() { + // Regardless of any other state, don't do anything if terminated + if (terminated) { + return false; + } + + // The machinery that produces messages won't activate until we have a subscriber + assert subscriber != null; + + if (readyToPublishQueueEmptySignal && !publishedQueueEmptySignal && getUnmetDemand() > 0) { + subscriber.onNext(new MessageStreamEntry.QueueEmpty()); + unmetDemand -= 1; + + assert unmetDemand >= 0; + + publishedQueueEmptySignal = true; + + return true; + } + + return false; + } + + private synchronized void maybeGenerateMessageSource() { + // Regardless of any other state, don't do anything if terminated + if (terminated) { + return; + } + + if (storedMessageState == StoredMessageState.EMPTY || unmetDemand == 0) { + // We don't think there are any messages in either source or there's no demand for messages; either way, wait for + // things to change before trying to generate a message source + return; + } + + // We maybe be able to skip reading from DynamoDB entirely if we think messages are only stored in Redis + final Publisher dynamoPublisher = + storedMessageState == StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE + ? messagesDynamoDb.load(accountIdentifier, device, null) + : Flux.empty(); + + final Publisher redisPublisher = messagesCache.get(accountIdentifier, device.getId()); + + final Flux messageSource = Flux.concat(dynamoPublisher, redisPublisher) + .name(GET_MESSAGES_FOR_DEVICE_FLUX_NAME) + .tap(Micrometer.metrics(Metrics.globalRegistry)); + + messageSourceSubscriber = new MessageSourceSubscriber(this); + messageSource.subscribe(messageSourceSubscriber); + + // If nothing else happens before the DynamoDB/Redis publisher completes, then we'll have emptied all stored + // messages; new signals about persisted messages or newly-arrived messages will change this state + storedMessageState = StoredMessageState.EMPTY; + } + + private synchronized void handleNextMessage(final MessageProtos.Envelope message) { + // The machinery that produces messages won't activate until we have a subscriber + assert subscriber != null; + + if (!terminated) { + unmetDemand -= 1; + assert unmetDemand >= 0; + + subscriber.onNext(new MessageStreamEntry.Envelope(message)); + } + } + + private synchronized void handleMessageSourceComplete() { + // The machinery that produces messages won't activate until we have a subscriber + assert subscriber != null; + + messageSourceSubscriber = null; + + // Attempt to send a "queue empty" signal if we haven't already + readyToPublishQueueEmptySignal = true; + maybeSendQueueEmptySignal(); + + // New messages may have arrived already; fetch them if possible + maybeGenerateMessageSource(); + } + + private synchronized void handleMessageSourceError(final Throwable throwable) { + // The machinery that produces messages won't activate until we have a subscriber + assert subscriber != null; + + if (!terminated) { + subscriber.onError(throwable); + terminate(); + } + } + + private synchronized void addDemand(final long demand) { + if (demand <= 0) { + throw new IllegalArgumentException("Demand must be positive"); + } + + unmetDemand += demand; + + // We may have been waiting for non-zero demand before sending a "queue empty" signal + final boolean sentQueueEmptySignal = maybeSendQueueEmptySignal(); + + // This is a little tricky; if we already have a subscriber, we only want to request NEW demand, not the total + // outstanding demand. On top of that, we may have consumed some demand by sending a "queue empty" message. + final long newDemand = demand - (sentQueueEmptySignal ? 1 : 0); + + if (newDemand > 0) { + if (messageSourceSubscriber != null) { + messageSourceSubscriber.request(newDemand); + } else { + maybeGenerateMessageSource(); + } + } + } + + private synchronized long getUnmetDemand() { + return unmetDemand; + } + + private synchronized void terminate() { + if (!terminated) { + terminated = true; + + // Stop receiving signals about new messages/conflicting consumers + redisMessageAvailabilityManager.handleClientDisconnected(accountIdentifier, device.getId()); + + if (messageSourceSubscriber != null) { + messageSourceSubscriber.dispose(); + } + } + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisDynamoDbMessageStream.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisDynamoDbMessageStream.java new file mode 100644 index 000000000..c0dbaed66 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/RedisDynamoDbMessageStream.java @@ -0,0 +1,59 @@ +/* + * Copyright 2025 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.storage; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Flow; +import org.whispersystems.textsecuregcm.entities.MessageProtos; +import org.whispersystems.textsecuregcm.push.RedisMessageAvailabilityManager; +import org.whispersystems.textsecuregcm.util.Util; + +/// A [MessageStream] implementation that produces message from a joint DynamoDB/Redis message store. +public class RedisDynamoDbMessageStream implements MessageStream { + + private final MessagesDynamoDb messagesDynamoDb; + private final MessagesCache messagesCache; + + private final UUID accountIdentifier; + private final Device device; + + private final RedisDynamoDbMessagePublisher messagePublisher; + + public RedisDynamoDbMessageStream(final MessagesDynamoDb messagesDynamoDb, + final MessagesCache messagesCache, + final RedisMessageAvailabilityManager redisMessageAvailabilityManager, + final UUID accountIdentifier, + final Device device) { + + this.messagesDynamoDb = messagesDynamoDb; + this.messagesCache = messagesCache; + this.accountIdentifier = accountIdentifier; + this.device = device; + + this.messagePublisher = new RedisDynamoDbMessagePublisher(messagesDynamoDb, + messagesCache, + redisMessageAvailabilityManager, + accountIdentifier, + device); + } + + @Override + public Flow.Publisher getMessages() { + return messagePublisher; + } + + @Override + public CompletableFuture acknowledgeMessage(final MessageProtos.Envelope message) { + final UUID guid = UUID.fromString(message.getServerGuid()); + + return messagesCache.remove(accountIdentifier, device.getId(), guid) + .thenCompose(removed -> removed.map(_ -> CompletableFuture.completedFuture(null)) + .orElseGet(() -> + messagesDynamoDb.deleteMessage(accountIdentifier, device, guid, message.getServerTimestamp()) + .thenRun(Util.NOOP))); + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index 8e5bbec4a..6bc46442a 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -429,7 +429,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Disconn } @Override - public void handleConflictingMessageReader() { + public void handleConflictingMessageConsumer() { closeConnection(4409, "Connected elsewhere"); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java index 1a1abb3a2..ab20c8605 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java @@ -266,7 +266,9 @@ record CommandDependencies( configuration.getReportMessageConfiguration().getReportTtl()); ReportMessageManager reportMessageManager = new ReportMessageManager(reportMessageDynamoDb, rateLimitersCluster, configuration.getReportMessageConfiguration().getCounterTtl()); - MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, + RedisMessageAvailabilityManager redisMessageAvailabilityManager = + new RedisMessageAvailabilityManager(messagesCluster, clientEventExecutor, asyncOperationQueueingExecutor); + MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager, reportMessageManager, messageDeletionExecutor, Clock.systemUTC()); AccountLockManager accountLockManager = new AccountLockManager(dynamoDbClient, configuration.getDynamoDbTables().getDeletedAccountsLock().getTableName()); @@ -321,9 +323,6 @@ record CommandDependencies( configuration.getDynamoDbTables().getPushNotificationExperimentSamples().getTableName(), Clock.systemUTC()); - RedisMessageAvailabilityManager redisMessageAvailabilityManager = - new RedisMessageAvailabilityManager(messagesCluster, clientEventExecutor, asyncOperationQueueingExecutor); - final DynamoDbRecoveryManager dynamoDbRecoveryManager = new DynamoDbRecoveryManager(accounts, phoneNumberIdentifiers); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/RedisMessageAvailabilityManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/RedisMessageAvailabilityManagerTest.java index a32807beb..2c0343424 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/RedisMessageAvailabilityManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/RedisMessageAvailabilityManagerTest.java @@ -62,7 +62,7 @@ class RedisMessageAvailabilityManagerTest { } @Override - public void handleConflictingMessageReader() { + public void handleConflictingMessageConsumer() { } } @@ -110,7 +110,7 @@ class RedisMessageAvailabilityManagerTest { localEventManager.handleClientConnected(accountIdentifier, deviceId, new MessageAvailabilityAdapter() { @Override - public void handleConflictingMessageReader() { + public void handleConflictingMessageConsumer() { synchronized (firstListenerDisplaced) { firstListenerDisplaced.set(true); firstListenerDisplaced.notifyAll(); @@ -126,7 +126,7 @@ class RedisMessageAvailabilityManagerTest { displacingManager.handleClientConnected(accountIdentifier, deviceId, new MessageAvailabilityAdapter() { @Override - public void handleConflictingMessageReader() { + public void handleConflictingMessageConsumer() { secondListenerDisplaced.set(true); } }).toCompletableFuture().join(); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java index cd0cb74a9..3d6d59d14 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java @@ -82,8 +82,8 @@ class MessagePersisterIntegrationTest { messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), messageDeliveryScheduler, messageDeletionExecutorService, Clock.systemUTC()); - messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, mock(ReportMessageManager.class), - messageDeletionExecutorService, Clock.systemUTC()); + messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, mock(RedisMessageAvailabilityManager.class), + mock(ReportMessageManager.class), messageDeletionExecutorService, Clock.systemUTC()); websocketConnectionEventExecutor = Executors.newVirtualThreadPerTaskExecutor(); asyncOperationQueueingExecutor = Executors.newSingleThreadExecutor(); @@ -165,7 +165,7 @@ class MessagePersisterIntegrationTest { } @Override - public void handleConflictingMessageReader() { + public void handleConflictingMessageConsumer() { } }); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesManagerTest.java index c7691c0c8..fd01c3a9a 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesManagerTest.java @@ -40,6 +40,7 @@ import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier; import org.whispersystems.textsecuregcm.identity.IdentityType; import org.whispersystems.textsecuregcm.identity.PniServiceIdentifier; import org.whispersystems.textsecuregcm.identity.ServiceIdentifier; +import org.whispersystems.textsecuregcm.push.RedisMessageAvailabilityManager; import org.whispersystems.textsecuregcm.tests.util.MultiRecipientMessageHelper; import org.whispersystems.textsecuregcm.tests.util.TestRecipient; import org.whispersystems.textsecuregcm.util.TestClock; @@ -54,7 +55,7 @@ class MessagesManagerTest { private static final TestClock CLOCK = TestClock.pinned(Instant.now()); private final MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, - reportMessageManager, Executors.newSingleThreadExecutor(), CLOCK); + mock(RedisMessageAvailabilityManager.class), reportMessageManager, Executors.newSingleThreadExecutor(), CLOCK); @BeforeEach void setUp() { diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisDynamoDbMessagePublisherTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisDynamoDbMessagePublisherTest.java new file mode 100644 index 000000000..5d8821f09 --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisDynamoDbMessagePublisherTest.java @@ -0,0 +1,390 @@ +/* + * Copyright 2025 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.storage; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.time.Clock; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.whispersystems.textsecuregcm.entities.MessageProtos; +import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier; +import org.whispersystems.textsecuregcm.identity.ServiceIdentifier; +import org.whispersystems.textsecuregcm.push.RedisMessageAvailabilityManager; +import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; +import reactor.adapter.JdkFlowAdapter; +import reactor.core.Disposable; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; +import reactor.test.StepVerifier; + +@Timeout(value = 5, threadMode = Timeout.ThreadMode.SEPARATE_THREAD) +class RedisDynamoDbMessagePublisherTest { + + private MessagesDynamoDb messagesDynamoDb; + private MessagesCache messagesCache; + private RedisMessageAvailabilityManager redisMessageAvailabilityManager; + + private static ExecutorService sharedExecutorService; + private static Scheduler messageDeliveryScheduler; + + private Device destinationDevice; + + @RegisterExtension + static final DynamoDbExtension DYNAMO_DB_EXTENSION = new DynamoDbExtension(DynamoDbExtensionSchema.Tables.MESSAGES); + + @RegisterExtension + static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build(); + + private static final AtomicLong SERIAL_TIMESTAMP = new AtomicLong(0); + + private static final ServiceIdentifier DESTINATION_SERVICE_IDENTIFIER = new AciServiceIdentifier(UUID.randomUUID()); + + @BeforeAll + static void setUpBeforeAll() { + sharedExecutorService = Executors.newVirtualThreadPerTaskExecutor(); + messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery"); + } + + @BeforeEach + void setUp() throws IOException { + messagesDynamoDb = new MessagesDynamoDb(DYNAMO_DB_EXTENSION.getDynamoDbClient(), + DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(), + DynamoDbExtensionSchema.Tables.MESSAGES.tableName(), + Duration.ofDays(14), + sharedExecutorService); + + messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), + messageDeliveryScheduler, sharedExecutorService, Clock.systemUTC()); + + redisMessageAvailabilityManager = mock(RedisMessageAvailabilityManager.class); + + destinationDevice = mock(Device.class); + when(destinationDevice.getId()).thenReturn(Device.PRIMARY_ID); + when(destinationDevice.getCreated()).thenReturn(System.currentTimeMillis()); + } + + @AfterAll + static void tearDownAfterAll() { + sharedExecutorService.shutdown(); + messageDeliveryScheduler.dispose(); + } + + @Test + void subscribeDispose() { + final byte deviceId = Device.PRIMARY_ID; + + final Device device = mock(Device.class); + when(device.getId()).thenReturn(deviceId); + when(device.getCreated()).thenReturn(System.currentTimeMillis()); + + { + final UUID accountIdentifier = UUID.randomUUID(); + + final RedisDynamoDbMessagePublisher _ = + new RedisDynamoDbMessagePublisher(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager, accountIdentifier, device); + + verify(redisMessageAvailabilityManager, never()).handleClientConnected(eq(accountIdentifier), eq(deviceId), any()); + verify(redisMessageAvailabilityManager, never()).handleClientDisconnected(eq(accountIdentifier), eq(deviceId)); + } + + { + final UUID accountIdentifier = UUID.randomUUID(); + + final RedisDynamoDbMessagePublisher messagePublisher = + new RedisDynamoDbMessagePublisher(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager, accountIdentifier, device); + + JdkFlowAdapter.flowPublisherToFlux(messagePublisher).subscribe(); + + verify(redisMessageAvailabilityManager).handleClientConnected(eq(accountIdentifier), eq(deviceId), any()); + verify(redisMessageAvailabilityManager, never()).handleClientDisconnected(eq(accountIdentifier), eq(deviceId)); + } + + { + final UUID accountIdentifier = UUID.randomUUID(); + + final RedisDynamoDbMessagePublisher messagePublisher = + new RedisDynamoDbMessagePublisher(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager, accountIdentifier, device); + + final Disposable disposable = JdkFlowAdapter.flowPublisherToFlux(messagePublisher).subscribe(); + disposable.dispose(); + + verify(redisMessageAvailabilityManager).handleClientConnected(eq(accountIdentifier), eq(deviceId), any()); + verify(redisMessageAvailabilityManager).handleClientDisconnected(eq(accountIdentifier), eq(deviceId)); + } + } + + @Test + void publishMessages() { + final MessageProtos.Envelope dynamoDbMessage = insertDynamoDbMessage(generateRandomMessage()); + final MessageProtos.Envelope redisMessage = insertRedisMessage(generateRandomMessage()); + + final RedisDynamoDbMessagePublisher messagePublisher = + new RedisDynamoDbMessagePublisher(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager, DESTINATION_SERVICE_IDENTIFIER.uuid(), destinationDevice); + + StepVerifier.create(JdkFlowAdapter.flowPublisherToFlux(messagePublisher)) + .expectNext(new MessageStreamEntry.Envelope(dynamoDbMessage)) + .expectNext(new MessageStreamEntry.Envelope(redisMessage)) + .expectNext(new MessageStreamEntry.QueueEmpty()) + .verifyTimeout(Duration.ofMillis(500)); + } + + @Test + void publishMessagesDynamoDbOnly() { + final MessageProtos.Envelope dynamoDbMessage = insertDynamoDbMessage(generateRandomMessage()); + + final RedisDynamoDbMessagePublisher messagePublisher = + new RedisDynamoDbMessagePublisher(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager, DESTINATION_SERVICE_IDENTIFIER.uuid(), destinationDevice); + + StepVerifier.create(JdkFlowAdapter.flowPublisherToFlux(messagePublisher)) + .expectNext(new MessageStreamEntry.Envelope(dynamoDbMessage)) + .expectNext(new MessageStreamEntry.QueueEmpty()) + .verifyTimeout(Duration.ofMillis(500)); + } + + @Test + void publishMessagesRedisOnly() { + final MessageProtos.Envelope redisMessage = insertRedisMessage(generateRandomMessage()); + + final RedisDynamoDbMessagePublisher messagePublisher = + new RedisDynamoDbMessagePublisher(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager, DESTINATION_SERVICE_IDENTIFIER.uuid(), destinationDevice); + + StepVerifier.create(JdkFlowAdapter.flowPublisherToFlux(messagePublisher)) + .expectNext(new MessageStreamEntry.Envelope(redisMessage)) + .expectNext(new MessageStreamEntry.QueueEmpty()) + .verifyTimeout(Duration.ofMillis(500)); + } + + @Test + void publishMessagesTailNewRedisMessages() { + final MessageProtos.Envelope dynamoDbMessage = insertDynamoDbMessage(generateRandomMessage()); + final MessageProtos.Envelope redisMessage = insertRedisMessage(generateRandomMessage()); + + final MessageProtos.Envelope newArrivalRedisMessage = generateRandomMessage(); + + final RedisDynamoDbMessagePublisher messagePublisher = + new RedisDynamoDbMessagePublisher(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager, DESTINATION_SERVICE_IDENTIFIER.uuid(), destinationDevice); + + final CountDownLatch countDownLatch = new CountDownLatch(2); + + Thread.ofVirtual().start(() -> { + try { + countDownLatch.await(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + + deleteRedisMessage(redisMessage); + deleteDynamoDbMessage(dynamoDbMessage); + + insertRedisMessage(newArrivalRedisMessage); + messagePublisher.handleNewMessageAvailable(); + }); + + StepVerifier.create(JdkFlowAdapter.flowPublisherToFlux(messagePublisher) + .doOnNext(_ -> countDownLatch.countDown())) + .expectNext(new MessageStreamEntry.Envelope(dynamoDbMessage)) + .expectNext(new MessageStreamEntry.Envelope(redisMessage)) + .expectNext(new MessageStreamEntry.QueueEmpty()) + .expectNext(new MessageStreamEntry.Envelope(newArrivalRedisMessage)) + .verifyTimeout(Duration.ofMillis(500)); + } + + @Test + void publishMessagesTailNewPersistedMessages() { + final MessageProtos.Envelope dynamoDbMessage = insertDynamoDbMessage(generateRandomMessage()); + final MessageProtos.Envelope redisMessage = insertRedisMessage(generateRandomMessage()); + + final MessageProtos.Envelope persistedMessage = generateRandomMessage(); + + final RedisDynamoDbMessagePublisher messagePublisher = + new RedisDynamoDbMessagePublisher(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager, DESTINATION_SERVICE_IDENTIFIER.uuid(), destinationDevice); + + final CountDownLatch countDownLatch = new CountDownLatch(2); + + Thread.ofVirtual().start(() -> { + try { + countDownLatch.await(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + + deleteRedisMessage(redisMessage); + deleteDynamoDbMessage(dynamoDbMessage); + + insertDynamoDbMessage(persistedMessage); + messagePublisher.handleMessagesPersisted(); + }); + + StepVerifier.create(JdkFlowAdapter.flowPublisherToFlux(messagePublisher) + .doOnNext(_ -> countDownLatch.countDown())) + .expectNext(new MessageStreamEntry.Envelope(dynamoDbMessage)) + .expectNext(new MessageStreamEntry.Envelope(redisMessage)) + .expectNext(new MessageStreamEntry.QueueEmpty()) + .expectNext(new MessageStreamEntry.Envelope(persistedMessage)) + .verifyTimeout(Duration.ofMillis(500)); + } + + @Test + void publishMessagesConsumerConflict() { + final RedisDynamoDbMessagePublisher messagePublisher = + new RedisDynamoDbMessagePublisher(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager, DESTINATION_SERVICE_IDENTIFIER.uuid(), destinationDevice); + + final CountDownLatch countDownLatch = new CountDownLatch(1); + + Thread.ofVirtual().start(() -> { + try { + countDownLatch.await(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + + messagePublisher.handleConflictingMessageConsumer(); + }); + + StepVerifier.create(JdkFlowAdapter.flowPublisherToFlux(messagePublisher) + .doOnSubscribe(_ -> countDownLatch.countDown())) + .expectError(ConflictingMessageConsumerException.class) + .verify(); + + verify(redisMessageAvailabilityManager, timeout(1_000)).handleClientConnected(DESTINATION_SERVICE_IDENTIFIER.uuid(), destinationDevice.getId(), messagePublisher); + verify(redisMessageAvailabilityManager, timeout(1_000)).handleClientDisconnected(DESTINATION_SERVICE_IDENTIFIER.uuid(), destinationDevice.getId()); + } + + @ParameterizedTest + @CsvSource({ + "207, 173", + "323, 0", + "0, 221", + }) + void publishMessagesMultipleRequests(final int persistedMessageCount, final int cachedMessageCount) { + final List expectedMessages = new ArrayList<>(persistedMessageCount + cachedMessageCount); + + for (int i = 0; i < persistedMessageCount; i++) { + expectedMessages.add(insertDynamoDbMessage(generateRandomMessage())); + } + + for (int i = 0; i < cachedMessageCount; i++) { + expectedMessages.add(insertRedisMessage(generateRandomMessage())); + } + + final RedisDynamoDbMessagePublisher messagePublisher = + new RedisDynamoDbMessagePublisher(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager, DESTINATION_SERVICE_IDENTIFIER.uuid(), destinationDevice); + + final List publishedMessages = new ArrayList<>(expectedMessages.size()); + + final CompletableFuture queueEmptyFuture = new CompletableFuture<>(); + + final Disposable disposable = JdkFlowAdapter.flowPublisherToFlux(messagePublisher) + .limitRate(20) + .doOnNext(entry -> { + if (entry instanceof MessageStreamEntry.Envelope(final MessageProtos.Envelope message)) { + publishedMessages.add(message); + } else if (entry instanceof MessageStreamEntry.QueueEmpty) { + queueEmptyFuture.complete(null); + } + }) + .subscribe(); + + queueEmptyFuture.thenRun(disposable::dispose).join(); + + assertEquals(expectedMessages, publishedMessages); + } + + @Test + void publishQueueEmptySignalDeferred() { + final MessageProtos.Envelope redisMessage = insertRedisMessage(generateRandomMessage()); + + { + final RedisDynamoDbMessagePublisher messagePublisher = + new RedisDynamoDbMessagePublisher(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager, + DESTINATION_SERVICE_IDENTIFIER.uuid(), destinationDevice); + + StepVerifier.create(JdkFlowAdapter.flowPublisherToFlux(messagePublisher), 1) + .expectNext(new MessageStreamEntry.Envelope(redisMessage)) + .verifyTimeout(Duration.ofMillis(500)); + } + + { + final RedisDynamoDbMessagePublisher messagePublisher = + new RedisDynamoDbMessagePublisher(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager, + DESTINATION_SERVICE_IDENTIFIER.uuid(), destinationDevice); + + StepVerifier.create(JdkFlowAdapter.flowPublisherToFlux(messagePublisher), 2) + .expectNext(new MessageStreamEntry.Envelope(redisMessage)) + .expectNext(new MessageStreamEntry.QueueEmpty()) + .verifyTimeout(Duration.ofMillis(500)); + } + } + + private MessageProtos.Envelope insertRedisMessage(final MessageProtos.Envelope message) { + messagesCache.insert(UUID.fromString(message.getServerGuid()), + DESTINATION_SERVICE_IDENTIFIER.uuid(), + destinationDevice.getId(), + message) + .join(); + + return message; + } + + private void deleteRedisMessage(final MessageProtos.Envelope message) { + messagesCache.remove(DESTINATION_SERVICE_IDENTIFIER.uuid(), destinationDevice.getId(), UUID.fromString(message.getServerGuid())).join(); + } + + private MessageProtos.Envelope insertDynamoDbMessage(final MessageProtos.Envelope message) { + messagesDynamoDb.store(List.of(message), DESTINATION_SERVICE_IDENTIFIER.uuid(), destinationDevice); + + return message; + } + + private void deleteDynamoDbMessage(final MessageProtos.Envelope message) { + messagesDynamoDb.deleteMessage(DESTINATION_SERVICE_IDENTIFIER.uuid(), + destinationDevice, + UUID.fromString(message.getServerGuid()), + message.getServerTimestamp()) + .join(); + } + + private static MessageProtos.Envelope generateRandomMessage() { + + final long timestamp = SERIAL_TIMESTAMP.incrementAndGet(); + + final MessageProtos.Envelope.Builder envelopeBuilder = MessageProtos.Envelope.newBuilder() + .setClientTimestamp(timestamp) + .setServerTimestamp(timestamp) + .setContent(ByteString.copyFromUtf8(RandomStringUtils.secure().nextAlphanumeric(256))) + .setType(MessageProtos.Envelope.Type.CIPHERTEXT) + .setServerGuid(UUID.randomUUID().toString()) + .setDestinationServiceId(DESTINATION_SERVICE_IDENTIFIER.toServiceIdentifierString()); + + return envelopeBuilder.build(); + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisDynamoDbMessageStreamTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisDynamoDbMessageStreamTest.java new file mode 100644 index 000000000..3bd3e802c --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/RedisDynamoDbMessageStreamTest.java @@ -0,0 +1,97 @@ +/* + * Copyright 2025 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.storage; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyByte; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.whispersystems.textsecuregcm.entities.MessageProtos; +import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier; +import org.whispersystems.textsecuregcm.push.RedisMessageAvailabilityManager; + +class RedisDynamoDbMessageStreamTest { + + private MessagesDynamoDb messagesDynamoDb; + private MessagesCache messagesCache; + + private RedisDynamoDbMessageStream redisDynamoDbMessageStream; + + private Device device; + + private static final UUID ACCOUNT_IDENTIFIER = UUID.randomUUID(); + private static final byte DEVICE_ID = Device.PRIMARY_ID; + + @BeforeEach + void setUp() { + messagesDynamoDb = mock(MessagesDynamoDb.class); + messagesCache = mock(MessagesCache.class); + + device = mock(Device.class); + when(device.getId()).thenReturn(DEVICE_ID); + + redisDynamoDbMessageStream = new RedisDynamoDbMessageStream(messagesDynamoDb, + messagesCache, + mock(RedisMessageAvailabilityManager.class), + ACCOUNT_IDENTIFIER, + device); + + when(messagesDynamoDb.deleteMessage(any(), any(), any(), anyLong())) + .thenReturn(CompletableFuture.completedFuture(Optional.empty())); + + when(messagesCache.remove(any(), anyByte(), any(UUID.class))) + .thenReturn(CompletableFuture.completedFuture(Optional.empty())); + } + + @Test + void acknowledgeMessageDynamoDb() { + final MessageProtos.Envelope message = generateMessage(); + final UUID messageGuid = UUID.fromString(message.getServerGuid()); + final long serverTimestamp = message.getServerTimestamp(); + + when(messagesDynamoDb.deleteMessage(ACCOUNT_IDENTIFIER, device, messageGuid, serverTimestamp)) + .thenReturn(CompletableFuture.completedFuture(Optional.of(message))); + + redisDynamoDbMessageStream.acknowledgeMessage(message).join(); + + verify(messagesCache).remove(ACCOUNT_IDENTIFIER, DEVICE_ID, messageGuid); + verify(messagesDynamoDb).deleteMessage(ACCOUNT_IDENTIFIER, device, messageGuid, serverTimestamp); + } + + @Test + void acknowledgeMessageRedis() { + final MessageProtos.Envelope message = generateMessage(); + final UUID messageGuid = UUID.fromString(message.getServerGuid()); + final long serverTimestamp = message.getServerTimestamp(); + + when(messagesCache.remove(ACCOUNT_IDENTIFIER, DEVICE_ID, messageGuid)) + .thenReturn(CompletableFuture.completedFuture(Optional.of(RemovedMessage.fromEnvelope(message)))); + + redisDynamoDbMessageStream.acknowledgeMessage(message).join(); + + verify(messagesCache).remove(ACCOUNT_IDENTIFIER, DEVICE_ID, messageGuid); + verify(messagesDynamoDb, never()).deleteMessage(any(), any(), any(), anyLong()); + } + + private static MessageProtos.Envelope generateMessage() { + return MessageProtos.Envelope.newBuilder() + .setServerGuid(UUID.randomUUID().toString()) + .setDestinationServiceId(new AciServiceIdentifier(ACCOUNT_IDENTIFIER).toServiceIdentifierString()) + .setServerTimestamp(System.currentTimeMillis()) + .setClientTimestamp(System.currentTimeMillis()) + .setType(MessageProtos.Envelope.Type.CIPHERTEXT) + .build(); + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java index abca22849..3fb04ea36 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java @@ -53,6 +53,7 @@ import org.whispersystems.textsecuregcm.metrics.MessageMetrics; import org.whispersystems.textsecuregcm.push.PushNotificationManager; import org.whispersystems.textsecuregcm.push.PushNotificationScheduler; import org.whispersystems.textsecuregcm.push.ReceiptSender; +import org.whispersystems.textsecuregcm.push.RedisMessageAvailabilityManager; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.ClientReleaseManager; @@ -81,6 +82,7 @@ class WebSocketConnectionIntegrationTest { private ExecutorService sharedExecutorService; private MessagesDynamoDb messagesDynamoDb; private MessagesCache messagesCache; + private RedisMessageAvailabilityManager redisMessageAvailabilityManager; private ReportMessageManager reportMessageManager; private Account account; private Device device; @@ -103,6 +105,7 @@ class WebSocketConnectionIntegrationTest { messagesDynamoDb = new MessagesDynamoDb(DYNAMO_DB_EXTENSION.getDynamoDbClient(), DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(), Tables.MESSAGES.tableName(), Duration.ofDays(7), sharedExecutorService); + redisMessageAvailabilityManager = new RedisMessageAvailabilityManager(REDIS_CLUSTER_EXTENSION.getRedisCluster(), sharedExecutorService, sharedExecutorService); reportMessageManager = mock(ReportMessageManager.class); account = mock(Account.class); device = mock(Device.class); @@ -130,7 +133,7 @@ class WebSocketConnectionIntegrationTest { void testProcessStoredMessages(final int persistedMessageCount, final int cachedMessageCount) { final WebSocketConnection webSocketConnection = new WebSocketConnection( mock(ReceiptSender.class), - new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager, sharedExecutorService, Clock.systemUTC()), + new MessagesManager(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager, reportMessageManager, sharedExecutorService, Clock.systemUTC()), new MessageMetrics(), mock(PushNotificationManager.class), mock(PushNotificationScheduler.class), @@ -219,7 +222,7 @@ class WebSocketConnectionIntegrationTest { void testProcessStoredMessagesClientClosed() { final WebSocketConnection webSocketConnection = new WebSocketConnection( mock(ReceiptSender.class), - new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager, sharedExecutorService, Clock.systemUTC()), + new MessagesManager(messagesDynamoDb, messagesCache, redisMessageAvailabilityManager, reportMessageManager, sharedExecutorService, Clock.systemUTC()), new MessageMetrics(), mock(PushNotificationManager.class), mock(PushNotificationScheduler.class),