Rename WebSocketConnectionEventManager/Listener to MessageAvailabilityManager/Listener

This commit is contained in:
Jon Chambers
2025-07-23 14:38:53 -04:00
committed by Jon Chambers
parent cf222e1105
commit 038c68c594
14 changed files with 100 additions and 97 deletions

View File

@@ -5,11 +5,15 @@
package org.whispersystems.textsecuregcm.push;
import java.util.UUID;
/**
* A WebSocket connection event listener handles message availability and presence events related to a client's open
* WebSocket connection. Handler methods are run on dedicated threads and may safely perform blocking operations.
* A message availability listener handles message availability and presence events related to a client's open message
* stream. Handler methods are run on dedicated threads and may safely perform blocking operations.
*
* @see RedisMessageAvailabilityManager#handleClientConnected(UUID, byte, MessageAvailabilityListener)
*/
public interface WebSocketConnectionEventListener {
public interface MessageAvailabilityListener {
/**
* Indicates that a new message is available in the connected client's message queue.

View File

@@ -39,22 +39,22 @@ import org.whispersystems.textsecuregcm.util.UUIDUtil;
import org.whispersystems.textsecuregcm.util.Util;
/**
* The WebSocket connection event manager distributes events related to client presence and message availability to
* The Redis message availability manager distributes events related to client presence and message availability to
* registered listeners. In the current Signal server implementation, clients generally interact with the service by
* opening a dual-purpose WebSocket. The WebSocket serves as both a delivery mechanism for messages and as a channel
* for the client to issue API requests to the server. Clients are considered "present" if they have an open WebSocket
* connection and are therefore likely to receive messages as soon as they're delivered to the server. WebSocket
* connection managers make a best effort to ensure that clients have at most one active message delivery channel at
* a time.
* connection and are therefore likely to receive messages as soon as they're delivered to the server. Redis message
* availability managers ensure that clients have at most one active message delivery channel at a time on a
* best-effort basis.
*
* @implNote The WebSocket connection event manager uses the Redis 7 sharded pub/sub system to distribute events. This
* @implNote The Redis message availability manager uses the Redis 7 sharded pub/sub system to distribute events. This
* system makes a best effort to ensure that a given client has only a single open connection across the fleet of
* servers, but cannot guarantee at-most-one behavior.
*
* @see WebSocketConnectionEventListener
* @see MessageAvailabilityListener
* @see org.whispersystems.textsecuregcm.storage.MessagesManager#insert(UUID, Map)
*/
public class WebSocketConnectionEventManager extends RedisClusterPubSubAdapter<byte[], byte[]> implements Managed {
public class RedisMessageAvailabilityManager extends RedisClusterPubSubAdapter<byte[], byte[]> implements Managed {
private final AccountsManager accountsManager;
private final PushNotificationManager pushNotificationManager;
@@ -68,7 +68,7 @@ public class WebSocketConnectionEventManager extends RedisClusterPubSubAdapter<b
@Nullable
private FaultTolerantPubSubClusterConnection<byte[], byte[]> pubSubConnection;
private final Map<AccountAndDeviceIdentifier, WebSocketConnectionEventListener> listenersByAccountAndDeviceIdentifier;
private final Map<AccountAndDeviceIdentifier, MessageAvailabilityListener> listenersByAccountAndDeviceIdentifier;
private final UUID serverId = UUID.randomUUID();
@@ -80,31 +80,31 @@ public class WebSocketConnectionEventManager extends RedisClusterPubSubAdapter<b
.toByteArray();
private static final Counter PUBLISH_CLIENT_CONNECTION_EVENT_ERROR_COUNTER =
Metrics.counter(MetricsUtil.name(WebSocketConnectionEventManager.class, "publishClientConnectionEventError"));
Metrics.counter(MetricsUtil.name(RedisMessageAvailabilityManager.class, "publishClientConnectionEventError"));
private static final Counter UNSUBSCRIBE_ERROR_COUNTER =
Metrics.counter(MetricsUtil.name(WebSocketConnectionEventManager.class, "unsubscribeError"));
Metrics.counter(MetricsUtil.name(RedisMessageAvailabilityManager.class, "unsubscribeError"));
private static final Counter PUB_SUB_EVENT_WITHOUT_LISTENER_COUNTER =
Metrics.counter(MetricsUtil.name(WebSocketConnectionEventManager.class, "pubSubEventWithoutListener"));
Metrics.counter(MetricsUtil.name(RedisMessageAvailabilityManager.class, "pubSubEventWithoutListener"));
private static final Counter MESSAGE_AVAILABLE_WITHOUT_LISTENER_COUNTER =
Metrics.counter(MetricsUtil.name(WebSocketConnectionEventManager.class, "messageAvailableWithoutListener"));
Metrics.counter(MetricsUtil.name(RedisMessageAvailabilityManager.class, "messageAvailableWithoutListener"));
private static final String LISTENER_GAUGE_NAME =
MetricsUtil.name(WebSocketConnectionEventManager.class, "listeners");
MetricsUtil.name(RedisMessageAvailabilityManager.class, "listeners");
private static final Logger logger = LoggerFactory.getLogger(WebSocketConnectionEventManager.class);
private static final Logger logger = LoggerFactory.getLogger(RedisMessageAvailabilityManager.class);
@VisibleForTesting
record AccountAndDeviceIdentifier(UUID accountIdentifier, byte deviceId) {
}
public WebSocketConnectionEventManager(final AccountsManager accountsManager,
final PushNotificationManager pushNotificationManager,
final FaultTolerantRedisClusterClient clusterClient,
final Executor listenerEventExecutor,
final Executor asyncOperationQueueingExecutor) {
public RedisMessageAvailabilityManager(final AccountsManager accountsManager,
final PushNotificationManager pushNotificationManager,
final FaultTolerantRedisClusterClient clusterClient,
final Executor listenerEventExecutor,
final Executor asyncOperationQueueingExecutor) {
this.accountsManager = accountsManager;
this.pushNotificationManager = pushNotificationManager;
@@ -140,7 +140,7 @@ public class WebSocketConnectionEventManager extends RedisClusterPubSubAdapter<b
/**
* Marks the given device as "present" for message delivery and registers a listener for new messages and conflicting
* connections. If the given device already has a presence registered with this manager, that presence is displaced
* immediately and the listener's {@link WebSocketConnectionEventListener#handleConflictingMessageReader()} method is called.
* immediately and the listener's {@link MessageAvailabilityListener#handleConflictingMessageReader()} method is called.
*
* @param accountIdentifier the account identifier for the newly-connected device
* @param deviceId the ID of the newly-connected device within the given account
@@ -151,13 +151,13 @@ public class WebSocketConnectionEventManager extends RedisClusterPubSubAdapter<b
* pub/sub subscription could not be established, in which case callers should close the client's connection to the
* server
*/
public CompletionStage<Void> handleClientConnected(final UUID accountIdentifier, final byte deviceId, final WebSocketConnectionEventListener listener) {
public CompletionStage<Void> handleClientConnected(final UUID accountIdentifier, final byte deviceId, final MessageAvailabilityListener listener) {
if (pubSubConnection == null) {
throw new IllegalStateException("WebSocket connection event manager not started");
}
final byte[] eventChannel = getClientEventChannel(accountIdentifier, deviceId);
final AtomicReference<WebSocketConnectionEventListener> displacedListener = new AtomicReference<>();
final AtomicReference<MessageAvailabilityListener> displacedListener = new AtomicReference<>();
final AtomicReference<CompletionStage<Void>> subscribeFuture = new AtomicReference<>();
// Note that we're relying on some specific implementation details of `ConcurrentHashMap#compute(...)`. In
@@ -315,7 +315,7 @@ public class WebSocketConnectionEventManager extends RedisClusterPubSubAdapter<b
final AccountAndDeviceIdentifier accountAndDeviceIdentifier = parseClientEventChannel(shardChannel);
@Nullable final WebSocketConnectionEventListener listener =
@Nullable final MessageAvailabilityListener listener =
listenersByAccountAndDeviceIdentifier.get(accountAndDeviceIdentifier);
if (listener != null) {