diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 82166c2b5..1d3a11b0e 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -199,7 +199,7 @@ import org.whispersystems.textsecuregcm.push.ProvisioningManager; import org.whispersystems.textsecuregcm.push.PushNotificationManager; import org.whispersystems.textsecuregcm.push.PushNotificationScheduler; import org.whispersystems.textsecuregcm.push.ReceiptSender; -import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager; +import org.whispersystems.textsecuregcm.push.RedisMessageAvailabilityManager; import org.whispersystems.textsecuregcm.redis.ConnectionEventLogger; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClient; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; @@ -655,8 +655,8 @@ public class WhisperServerService extends Application spamFilters = ServiceLoader.load(SpamFilter.class) @@ -1145,7 +1145,7 @@ public class WhisperServerService extends Application { - if (!webSocketConnectionEventManager.isLocallyPresent(auth.accountIdentifier(), auth.deviceId())) { + if (!redisMessageAvailabilityManager.isLocallyPresent(auth.accountIdentifier(), auth.deviceId())) { final Duration age = Duration.between(context.getClient().getCreated(), Instant.now()); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/WebSocketConnectionEventListener.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/MessageAvailabilityListener.java similarity index 62% rename from service/src/main/java/org/whispersystems/textsecuregcm/push/WebSocketConnectionEventListener.java rename to service/src/main/java/org/whispersystems/textsecuregcm/push/MessageAvailabilityListener.java index 6bc6af1f4..6af8643b2 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/WebSocketConnectionEventListener.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/MessageAvailabilityListener.java @@ -5,11 +5,15 @@ package org.whispersystems.textsecuregcm.push; +import java.util.UUID; + /** - * A WebSocket connection event listener handles message availability and presence events related to a client's open - * WebSocket connection. Handler methods are run on dedicated threads and may safely perform blocking operations. + * A message availability listener handles message availability and presence events related to a client's open message + * stream. Handler methods are run on dedicated threads and may safely perform blocking operations. + * + * @see RedisMessageAvailabilityManager#handleClientConnected(UUID, byte, MessageAvailabilityListener) */ -public interface WebSocketConnectionEventListener { +public interface MessageAvailabilityListener { /** * Indicates that a new message is available in the connected client's message queue. diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/WebSocketConnectionEventManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/RedisMessageAvailabilityManager.java similarity index 90% rename from service/src/main/java/org/whispersystems/textsecuregcm/push/WebSocketConnectionEventManager.java rename to service/src/main/java/org/whispersystems/textsecuregcm/push/RedisMessageAvailabilityManager.java index 05fdc1877..3e481ca1a 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/WebSocketConnectionEventManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/RedisMessageAvailabilityManager.java @@ -39,22 +39,22 @@ import org.whispersystems.textsecuregcm.util.UUIDUtil; import org.whispersystems.textsecuregcm.util.Util; /** - * The WebSocket connection event manager distributes events related to client presence and message availability to + * The Redis message availability manager distributes events related to client presence and message availability to * registered listeners. In the current Signal server implementation, clients generally interact with the service by * opening a dual-purpose WebSocket. The WebSocket serves as both a delivery mechanism for messages and as a channel * for the client to issue API requests to the server. Clients are considered "present" if they have an open WebSocket - * connection and are therefore likely to receive messages as soon as they're delivered to the server. WebSocket - * connection managers make a best effort to ensure that clients have at most one active message delivery channel at - * a time. + * connection and are therefore likely to receive messages as soon as they're delivered to the server. Redis message + * availability managers ensure that clients have at most one active message delivery channel at a time on a + * best-effort basis. * - * @implNote The WebSocket connection event manager uses the Redis 7 sharded pub/sub system to distribute events. This + * @implNote The Redis message availability manager uses the Redis 7 sharded pub/sub system to distribute events. This * system makes a best effort to ensure that a given client has only a single open connection across the fleet of * servers, but cannot guarantee at-most-one behavior. * - * @see WebSocketConnectionEventListener + * @see MessageAvailabilityListener * @see org.whispersystems.textsecuregcm.storage.MessagesManager#insert(UUID, Map) */ -public class WebSocketConnectionEventManager extends RedisClusterPubSubAdapter implements Managed { +public class RedisMessageAvailabilityManager extends RedisClusterPubSubAdapter implements Managed { private final AccountsManager accountsManager; private final PushNotificationManager pushNotificationManager; @@ -68,7 +68,7 @@ public class WebSocketConnectionEventManager extends RedisClusterPubSubAdapter pubSubConnection; - private final Map listenersByAccountAndDeviceIdentifier; + private final Map listenersByAccountAndDeviceIdentifier; private final UUID serverId = UUID.randomUUID(); @@ -80,31 +80,31 @@ public class WebSocketConnectionEventManager extends RedisClusterPubSubAdapter handleClientConnected(final UUID accountIdentifier, final byte deviceId, final WebSocketConnectionEventListener listener) { + public CompletionStage handleClientConnected(final UUID accountIdentifier, final byte deviceId, final MessageAvailabilityListener listener) { if (pubSubConnection == null) { throw new IllegalStateException("WebSocket connection event manager not started"); } final byte[] eventChannel = getClientEventChannel(accountIdentifier, deviceId); - final AtomicReference displacedListener = new AtomicReference<>(); + final AtomicReference displacedListener = new AtomicReference<>(); final AtomicReference> subscribeFuture = new AtomicReference<>(); // Note that we're relying on some specific implementation details of `ConcurrentHashMap#compute(...)`. In @@ -315,7 +315,7 @@ public class WebSocketConnectionEventManager extends RedisClusterPubSubAdapter args = new ArrayList<>(Arrays.asList( diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheUnlockQueueScript.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheUnlockQueueScript.java index bb654ce93..1629b657a 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheUnlockQueueScript.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCacheUnlockQueueScript.java @@ -8,7 +8,7 @@ package org.whispersystems.textsecuregcm.storage; import io.lettuce.core.ScriptOutputType; import org.whispersystems.textsecuregcm.push.ClientEvent; import org.whispersystems.textsecuregcm.push.MessagesPersistedEvent; -import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager; +import org.whispersystems.textsecuregcm.push.RedisMessageAvailabilityManager; import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; import java.io.IOException; @@ -35,7 +35,7 @@ class MessagesCacheUnlockQueueScript { void execute(final UUID accountIdentifier, final byte deviceId) { final List keys = List.of( MessagesCache.getPersistInProgressKey(accountIdentifier, deviceId), // persistInProgressKey - WebSocketConnectionEventManager.getClientEventChannel(accountIdentifier, deviceId) // eventChannelKey + RedisMessageAvailabilityManager.getClientEventChannel(accountIdentifier, deviceId) // eventChannelKey ); unlockQueueScript.executeBinary(keys, MESSAGES_PERSISTED_EVENT_ARGS); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java index 92a3426d7..45db80882 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java @@ -33,6 +33,7 @@ import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; import org.whispersystems.textsecuregcm.identity.IdentityType; import org.whispersystems.textsecuregcm.identity.ServiceIdentifier; import org.whispersystems.textsecuregcm.metrics.MetricsUtil; +import org.whispersystems.textsecuregcm.push.RedisMessageAvailabilityManager; import org.whispersystems.textsecuregcm.util.Pair; import reactor.core.observability.micrometer.Micrometer; import reactor.core.publisher.Flux; @@ -82,7 +83,7 @@ public class MessagesManager { * * @return a map of device IDs to a device's presence state (i.e. if the device has an active event listener) * - * @see org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager + * @see RedisMessageAvailabilityManager */ public Map insert(final UUID accountIdentifier, final Map messagesByDeviceId) { return insertAsync(accountIdentifier, messagesByDeviceId).join(); @@ -127,7 +128,7 @@ public class MessagesManager { * @return a map of accounts to maps of device IDs to a device's presence state (i.e. if the device has an active * event listener) * - * @see org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager + * @see RedisMessageAvailabilityManager */ public CompletableFuture>> insertMultiRecipientMessage( final SealedSenderMultiRecipientMessage multiRecipientMessage, diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java index 9048f56fa..5c7a488d2 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java @@ -19,7 +19,7 @@ import org.whispersystems.textsecuregcm.identity.IdentityType; import org.whispersystems.textsecuregcm.limits.MessageDeliveryLoopMonitor; import org.whispersystems.textsecuregcm.metrics.MessageMetrics; import org.whispersystems.textsecuregcm.metrics.OpenWebSocketCounter; -import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager; +import org.whispersystems.textsecuregcm.push.RedisMessageAvailabilityManager; import org.whispersystems.textsecuregcm.push.PushNotificationManager; import org.whispersystems.textsecuregcm.push.PushNotificationScheduler; import org.whispersystems.textsecuregcm.push.ReceiptSender; @@ -49,7 +49,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { private final MessageMetrics messageMetrics; private final PushNotificationManager pushNotificationManager; private final PushNotificationScheduler pushNotificationScheduler; - private final WebSocketConnectionEventManager webSocketConnectionEventManager; + private final RedisMessageAvailabilityManager redisMessageAvailabilityManager; private final DisconnectionRequestManager disconnectionRequestManager; private final ScheduledExecutorService scheduledExecutorService; private final Scheduler messageDeliveryScheduler; @@ -67,7 +67,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { final MessageMetrics messageMetrics, final PushNotificationManager pushNotificationManager, final PushNotificationScheduler pushNotificationScheduler, - final WebSocketConnectionEventManager webSocketConnectionEventManager, + final RedisMessageAvailabilityManager redisMessageAvailabilityManager, final DisconnectionRequestManager disconnectionRequestManager, final ScheduledExecutorService scheduledExecutorService, final Scheduler messageDeliveryScheduler, @@ -81,7 +81,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { this.messageMetrics = messageMetrics; this.pushNotificationManager = pushNotificationManager; this.pushNotificationScheduler = pushNotificationScheduler; - this.webSocketConnectionEventManager = webSocketConnectionEventManager; + this.redisMessageAvailabilityManager = redisMessageAvailabilityManager; this.disconnectionRequestManager = disconnectionRequestManager; this.scheduledExecutorService = scheduledExecutorService; this.messageDeliveryScheduler = messageDeliveryScheduler; @@ -145,7 +145,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { // receive push notifications for inbound messages. We should do this first because, at this point, the // connection has already closed and attempts to actually deliver a message via the connection will not succeed. // It's preferable to start sending push notifications as soon as possible. - webSocketConnectionEventManager.handleClientDisconnected(auth.accountIdentifier(), auth.deviceId()); + redisMessageAvailabilityManager.handleClientDisconnected(auth.accountIdentifier(), auth.deviceId()); // Finally, stop trying to deliver messages and send a push notification if the connection is aware of any // undelivered messages. @@ -161,7 +161,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { // Finally, we register this client's presence, which suppresses push notifications. We do this last because // receiving extra push notifications is generally preferable to missing out on a push notification. - webSocketConnectionEventManager.handleClientConnected(auth.accountIdentifier(), auth.deviceId(), connection); + redisMessageAvailabilityManager.handleClientConnected(auth.accountIdentifier(), auth.deviceId(), connection); } catch (final Exception e) { log.warn("Failed to initialize websocket", e); context.getClient().close(1011, "Unexpected error initializing connection"); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index a9d8148dc..5216336b9 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -51,7 +51,7 @@ import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil; import org.whispersystems.textsecuregcm.push.PushNotificationManager; import org.whispersystems.textsecuregcm.push.PushNotificationScheduler; import org.whispersystems.textsecuregcm.push.ReceiptSender; -import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventListener; +import org.whispersystems.textsecuregcm.push.MessageAvailabilityListener; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.ClientReleaseManager; import org.whispersystems.textsecuregcm.storage.Device; @@ -66,7 +66,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; -public class WebSocketConnection implements WebSocketConnectionEventListener, DisconnectionRequestListener { +public class WebSocketConnection implements MessageAvailabilityListener, DisconnectionRequestListener { private static final DistributionSummary messageTime = Metrics.summary( name(MessageController.class, "messageDeliveryDuration")); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java index bc62b8d2a..8611e37a1 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java @@ -43,7 +43,7 @@ import org.whispersystems.textsecuregcm.push.APNSender; import org.whispersystems.textsecuregcm.push.FcmSender; import org.whispersystems.textsecuregcm.push.PushNotificationManager; import org.whispersystems.textsecuregcm.push.PushNotificationScheduler; -import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager; +import org.whispersystems.textsecuregcm.push.RedisMessageAvailabilityManager; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClient; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient; @@ -319,15 +319,15 @@ record CommandDependencies( configuration.getDynamoDbTables().getPushNotificationExperimentSamples().getTableName(), Clock.systemUTC()); - WebSocketConnectionEventManager webSocketConnectionEventManager = - new WebSocketConnectionEventManager(accountsManager, pushNotificationManager, messagesCluster, clientEventExecutor, asyncOperationQueueingExecutor); + RedisMessageAvailabilityManager redisMessageAvailabilityManager = + new RedisMessageAvailabilityManager(accountsManager, pushNotificationManager, messagesCluster, clientEventExecutor, asyncOperationQueueingExecutor); final DynamoDbRecoveryManager dynamoDbRecoveryManager = new DynamoDbRecoveryManager(accounts, phoneNumberIdentifiers); environment.lifecycle().manage(apnSender); environment.lifecycle().manage(disconnectionRequestManager); - environment.lifecycle().manage(webSocketConnectionEventManager); + environment.lifecycle().manage(redisMessageAvailabilityManager); environment.lifecycle().manage(new ManagedAwsCrt()); return new CommandDependencies( diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/WebSocketConnectionEventManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/RedisMessageAvailabilityManagerTest.java similarity index 85% rename from service/src/test/java/org/whispersystems/textsecuregcm/push/WebSocketConnectionEventManagerTest.java rename to service/src/test/java/org/whispersystems/textsecuregcm/push/RedisMessageAvailabilityManagerTest.java index c71a056aa..8b3fc9750 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/WebSocketConnectionEventManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/RedisMessageAvailabilityManagerTest.java @@ -45,10 +45,10 @@ import org.whispersystems.textsecuregcm.tests.util.MockRedisFuture; import org.whispersystems.textsecuregcm.tests.util.RedisClusterHelper; @Timeout(value = 10, threadMode = Timeout.ThreadMode.SEPARATE_THREAD) -class WebSocketConnectionEventManagerTest { +class RedisMessageAvailabilityManagerTest { - private WebSocketConnectionEventManager localEventManager; - private WebSocketConnectionEventManager remoteEventManager; + private RedisMessageAvailabilityManager localEventManager; + private RedisMessageAvailabilityManager remoteEventManager; private static ExecutorService webSocketConnectionEventExecutor; private static ExecutorService asyncOperationQueueingExecutor; @@ -56,7 +56,7 @@ class WebSocketConnectionEventManagerTest { @RegisterExtension static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build(); - private static class WebSocketConnectionEventAdapter implements WebSocketConnectionEventListener { + private static class MessageAvailabilityAdapter implements MessageAvailabilityListener { @Override public void handleNewMessageAvailable() { @@ -79,13 +79,13 @@ class WebSocketConnectionEventManagerTest { @BeforeEach void setUp() { - localEventManager = new WebSocketConnectionEventManager(mock(AccountsManager.class), + localEventManager = new RedisMessageAvailabilityManager(mock(AccountsManager.class), mock(PushNotificationManager.class), REDIS_CLUSTER_EXTENSION.getRedisCluster(), webSocketConnectionEventExecutor, asyncOperationQueueingExecutor); - remoteEventManager = new WebSocketConnectionEventManager(mock(AccountsManager.class), + remoteEventManager = new RedisMessageAvailabilityManager(mock(AccountsManager.class), mock(PushNotificationManager.class), REDIS_CLUSTER_EXTENSION.getRedisCluster(), webSocketConnectionEventExecutor, @@ -117,7 +117,7 @@ class WebSocketConnectionEventManagerTest { final AtomicBoolean secondListenerDisplaced = new AtomicBoolean(false); - localEventManager.handleClientConnected(accountIdentifier, deviceId, new WebSocketConnectionEventAdapter() { + localEventManager.handleClientConnected(accountIdentifier, deviceId, new MessageAvailabilityAdapter() { @Override public void handleConflictingMessageReader() { synchronized (firstListenerDisplaced) { @@ -130,10 +130,10 @@ class WebSocketConnectionEventManagerTest { assertFalse(firstListenerDisplaced.get()); assertFalse(secondListenerDisplaced.get()); - final WebSocketConnectionEventManager displacingManager = + final RedisMessageAvailabilityManager displacingManager = displaceRemotely ? remoteEventManager : localEventManager; - displacingManager.handleClientConnected(accountIdentifier, deviceId, new WebSocketConnectionEventAdapter() { + displacingManager.handleClientConnected(accountIdentifier, deviceId, new MessageAvailabilityAdapter() { @Override public void handleConflictingMessageReader() { secondListenerDisplaced.set(true); @@ -158,7 +158,7 @@ class WebSocketConnectionEventManagerTest { assertFalse(localEventManager.isLocallyPresent(accountIdentifier, deviceId)); assertFalse(remoteEventManager.isLocallyPresent(accountIdentifier, deviceId)); - localEventManager.handleClientConnected(accountIdentifier, deviceId, new WebSocketConnectionEventAdapter()) + localEventManager.handleClientConnected(accountIdentifier, deviceId, new MessageAvailabilityAdapter()) .toCompletableFuture() .join(); @@ -188,7 +188,7 @@ class WebSocketConnectionEventManagerTest { .binaryPubSubAsyncCommands(pubSubAsyncCommands) .build(); - final WebSocketConnectionEventManager eventManager = new WebSocketConnectionEventManager( + final RedisMessageAvailabilityManager eventManager = new RedisMessageAvailabilityManager( mock(AccountsManager.class), mock(PushNotificationManager.class), clusterClient, @@ -199,7 +199,7 @@ class WebSocketConnectionEventManagerTest { final UUID firstAccountIdentifier = UUID.randomUUID(); final byte firstDeviceId = Device.PRIMARY_ID; - final int firstSlot = SlotHash.getSlot(WebSocketConnectionEventManager.getClientEventChannel(firstAccountIdentifier, firstDeviceId)); + final int firstSlot = SlotHash.getSlot(RedisMessageAvailabilityManager.getClientEventChannel(firstAccountIdentifier, firstDeviceId)); final UUID secondAccountIdentifier; final byte secondDeviceId = firstDeviceId + 1; @@ -210,15 +210,15 @@ class WebSocketConnectionEventManagerTest { do { candidateIdentifier = UUID.randomUUID(); - } while (SlotHash.getSlot(WebSocketConnectionEventManager.getClientEventChannel(candidateIdentifier, secondDeviceId)) == firstSlot); + } while (SlotHash.getSlot(RedisMessageAvailabilityManager.getClientEventChannel(candidateIdentifier, secondDeviceId)) == firstSlot); secondAccountIdentifier = candidateIdentifier; } - eventManager.handleClientConnected(firstAccountIdentifier, firstDeviceId, new WebSocketConnectionEventAdapter()).toCompletableFuture().join(); - eventManager.handleClientConnected(secondAccountIdentifier, secondDeviceId, new WebSocketConnectionEventAdapter()).toCompletableFuture().join(); + eventManager.handleClientConnected(firstAccountIdentifier, firstDeviceId, new MessageAvailabilityAdapter()).toCompletableFuture().join(); + eventManager.handleClientConnected(secondAccountIdentifier, secondDeviceId, new MessageAvailabilityAdapter()).toCompletableFuture().join(); - final int secondSlot = SlotHash.getSlot(WebSocketConnectionEventManager.getClientEventChannel(secondAccountIdentifier, secondDeviceId)); + final int secondSlot = SlotHash.getSlot(RedisMessageAvailabilityManager.getClientEventChannel(secondAccountIdentifier, secondDeviceId)); final String firstNodeId = UUID.randomUUID().toString(); @@ -241,8 +241,8 @@ class WebSocketConnectionEventManagerTest { List.of(firstBeforeNode), List.of(firstAfterNode, secondAfterNode))); - verify(pubSubCommands).ssubscribe(WebSocketConnectionEventManager.getClientEventChannel(secondAccountIdentifier, secondDeviceId)); - verify(pubSubCommands, never()).ssubscribe(WebSocketConnectionEventManager.getClientEventChannel(firstAccountIdentifier, firstDeviceId)); + verify(pubSubCommands).ssubscribe(RedisMessageAvailabilityManager.getClientEventChannel(secondAccountIdentifier, secondDeviceId)); + verify(pubSubCommands, never()).ssubscribe(RedisMessageAvailabilityManager.getClientEventChannel(firstAccountIdentifier, firstDeviceId)); } @Test @@ -256,7 +256,7 @@ class WebSocketConnectionEventManagerTest { .binaryPubSubAsyncCommands(pubSubAsyncCommands) .build(); - final WebSocketConnectionEventManager eventManager = new WebSocketConnectionEventManager( + final RedisMessageAvailabilityManager eventManager = new RedisMessageAvailabilityManager( mock(AccountsManager.class), mock(PushNotificationManager.class), clusterClient, @@ -271,21 +271,21 @@ class WebSocketConnectionEventManagerTest { final UUID noListenerAccountIdentifier = UUID.randomUUID(); final byte noListenerDeviceId = listenerDeviceId + 1; - eventManager.handleClientConnected(listenerAccountIdentifier, listenerDeviceId, new WebSocketConnectionEventAdapter()) + eventManager.handleClientConnected(listenerAccountIdentifier, listenerDeviceId, new MessageAvailabilityAdapter()) .toCompletableFuture() .join(); eventManager.unsubscribeIfMissingListener( - new WebSocketConnectionEventManager.AccountAndDeviceIdentifier(listenerAccountIdentifier, listenerDeviceId)); + new RedisMessageAvailabilityManager.AccountAndDeviceIdentifier(listenerAccountIdentifier, listenerDeviceId)); eventManager.unsubscribeIfMissingListener( - new WebSocketConnectionEventManager.AccountAndDeviceIdentifier(noListenerAccountIdentifier, noListenerDeviceId)); + new RedisMessageAvailabilityManager.AccountAndDeviceIdentifier(noListenerAccountIdentifier, noListenerDeviceId)); verify(pubSubAsyncCommands, never()) - .sunsubscribe(WebSocketConnectionEventManager.getClientEventChannel(listenerAccountIdentifier, listenerDeviceId)); + .sunsubscribe(RedisMessageAvailabilityManager.getClientEventChannel(listenerAccountIdentifier, listenerDeviceId)); verify(pubSubAsyncCommands) - .sunsubscribe(WebSocketConnectionEventManager.getClientEventChannel(noListenerAccountIdentifier, noListenerDeviceId)); + .sunsubscribe(RedisMessageAvailabilityManager.getClientEventChannel(noListenerAccountIdentifier, noListenerDeviceId)); } @Test @@ -314,7 +314,7 @@ class WebSocketConnectionEventManagerTest { .binaryPubSubAsyncCommands(pubSubAsyncCommands) .build(); - final WebSocketConnectionEventManager eventManager = new WebSocketConnectionEventManager( + final RedisMessageAvailabilityManager eventManager = new RedisMessageAvailabilityManager( accountsManager, pushNotificationManager, clusterClient, @@ -323,7 +323,7 @@ class WebSocketConnectionEventManagerTest { eventManager.start(); - eventManager.handleClientConnected(listenerAccountIdentifier, listenerDeviceId, new WebSocketConnectionEventAdapter()) + eventManager.handleClientConnected(listenerAccountIdentifier, listenerDeviceId, new MessageAvailabilityAdapter()) .toCompletableFuture() .join(); @@ -333,11 +333,11 @@ class WebSocketConnectionEventManagerTest { .toByteArray(); eventManager.smessage(mock(RedisClusterNode.class), - WebSocketConnectionEventManager.getClientEventChannel(listenerAccountIdentifier, listenerDeviceId), + RedisMessageAvailabilityManager.getClientEventChannel(listenerAccountIdentifier, listenerDeviceId), newMessagePayload); eventManager.smessage(mock(RedisClusterNode.class), - WebSocketConnectionEventManager.getClientEventChannel(noListenerAccountIdentifier, noListenerDeviceId), + RedisMessageAvailabilityManager.getClientEventChannel(noListenerAccountIdentifier, noListenerDeviceId), newMessagePayload); verify(pushNotificationManager).sendNewMessageNotification(noListenerAccount, noListenerDeviceId, true); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java index 9f7284b3c..0754c8596 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java @@ -30,13 +30,11 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import org.whispersystems.textsecuregcm.auth.DisconnectionRequestManager; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.entities.MessageProtos; -import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager; import org.whispersystems.textsecuregcm.push.PushNotificationManager; -import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventListener; -import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager; +import org.whispersystems.textsecuregcm.push.MessageAvailabilityListener; +import org.whispersystems.textsecuregcm.push.RedisMessageAvailabilityManager; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema.Tables; import org.whispersystems.textsecuregcm.tests.util.DevicesHelper; @@ -59,7 +57,7 @@ class MessagePersisterIntegrationTest { private ExecutorService asyncOperationQueueingExecutor; private MessagesCache messagesCache; private MessagesManager messagesManager; - private WebSocketConnectionEventManager webSocketConnectionEventManager; + private RedisMessageAvailabilityManager redisMessageAvailabilityManager; private MessagePersister messagePersister; private Account account; @@ -90,13 +88,13 @@ class MessagePersisterIntegrationTest { websocketConnectionEventExecutor = Executors.newVirtualThreadPerTaskExecutor(); asyncOperationQueueingExecutor = Executors.newSingleThreadExecutor(); - webSocketConnectionEventManager = new WebSocketConnectionEventManager(mock(AccountsManager.class), + redisMessageAvailabilityManager = new RedisMessageAvailabilityManager(mock(AccountsManager.class), mock(PushNotificationManager.class), REDIS_CLUSTER_EXTENSION.getRedisCluster(), websocketConnectionEventExecutor, asyncOperationQueueingExecutor); - webSocketConnectionEventManager.start(); + redisMessageAvailabilityManager.start(); messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, dynamicConfigurationManager, PERSIST_DELAY, 1); @@ -127,7 +125,7 @@ class MessagePersisterIntegrationTest { messageDeliveryScheduler.dispose(); - webSocketConnectionEventManager.stop(); + redisMessageAvailabilityManager.stop(); } @Test @@ -156,7 +154,7 @@ class MessagePersisterIntegrationTest { final AtomicBoolean messagesPersisted = new AtomicBoolean(false); - webSocketConnectionEventManager.handleClientConnected(account.getUuid(), Device.PRIMARY_ID, new WebSocketConnectionEventListener() { + redisMessageAvailabilityManager.handleClientConnected(account.getUuid(), Device.PRIMARY_ID, new MessageAvailabilityListener() { @Override public void handleNewMessageAvailable() { } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheInsertScriptTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheInsertScriptTest.java index d21710f05..ab718a3ff 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheInsertScriptTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheInsertScriptTest.java @@ -20,7 +20,7 @@ import com.google.protobuf.InvalidProtocolBufferException; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.whispersystems.textsecuregcm.entities.MessageProtos; -import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager; +import org.whispersystems.textsecuregcm.push.RedisMessageAvailabilityManager; import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubClusterConnection; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; @@ -100,7 +100,7 @@ class MessagesCacheInsertScriptTest { REDIS_CLUSTER_EXTENSION.getRedisCluster().createBinaryPubSubConnection(); pubSubClusterConnection.usePubSubConnection(connection -> - connection.sync().ssubscribe(WebSocketConnectionEventManager.getClientEventChannel(destinationUuid, deviceId))); + connection.sync().ssubscribe(RedisMessageAvailabilityManager.getClientEventChannel(destinationUuid, deviceId))); assertTrue(insertScript.executeAsync(destinationUuid, deviceId, MessageProtos.Envelope.newBuilder() .setServerTimestamp(Instant.now().getEpochSecond()) diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java index 93677cb70..2575600af 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java @@ -64,7 +64,7 @@ import org.whispersystems.textsecuregcm.metrics.MessageMetrics; import org.whispersystems.textsecuregcm.push.PushNotificationManager; import org.whispersystems.textsecuregcm.push.PushNotificationScheduler; import org.whispersystems.textsecuregcm.push.ReceiptSender; -import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager; +import org.whispersystems.textsecuregcm.push.RedisMessageAvailabilityManager; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.ClientReleaseManager; @@ -125,7 +125,7 @@ class WebSocketConnectionTest { new WebSocketAccountAuthenticator(accountAuthenticator); AuthenticatedConnectListener connectListener = new AuthenticatedConnectListener(accountsManager, receiptSender, messagesManager, new MessageMetrics(), mock(PushNotificationManager.class), mock(PushNotificationScheduler.class), - mock(WebSocketConnectionEventManager.class), mock(DisconnectionRequestManager.class), retrySchedulingExecutor, + mock(RedisMessageAvailabilityManager.class), mock(DisconnectionRequestManager.class), retrySchedulingExecutor, messageDeliveryScheduler, clientReleaseManager, mock(MessageDeliveryLoopMonitor.class), mock(ExperimentEnrollmentManager.class)); WebSocketSessionContext sessionContext = mock(WebSocketSessionContext.class);