Rename PubSubClientEventManager to WebSocketConnectionEventManager

This commit is contained in:
Jon Chambers
2024-11-09 09:16:22 -05:00
committed by Jon Chambers
parent 52b759c009
commit a843f1af6c
29 changed files with 240 additions and 218 deletions

View File

@@ -193,7 +193,7 @@ import org.whispersystems.textsecuregcm.push.APNSender;
import org.whispersystems.textsecuregcm.push.FcmSender;
import org.whispersystems.textsecuregcm.push.MessageSender;
import org.whispersystems.textsecuregcm.push.ProvisioningManager;
import org.whispersystems.textsecuregcm.push.PubSubClientEventManager;
import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager;
import org.whispersystems.textsecuregcm.push.PushNotificationManager;
import org.whispersystems.textsecuregcm.push.PushNotificationScheduler;
import org.whispersystems.textsecuregcm.push.ReceiptSender;
@@ -597,7 +597,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
secureValueRecoveryServiceExecutor, secureValueRecoveryServiceRetryExecutor, config.getSvr2Configuration());
SecureStorageClient secureStorageClient = new SecureStorageClient(storageCredentialsGenerator,
storageServiceExecutor, storageServiceRetryExecutor, config.getSecureStorageServiceConfiguration());
PubSubClientEventManager pubSubClientEventManager = new PubSubClientEventManager(messagesCluster, clientEventExecutor);
WebSocketConnectionEventManager webSocketConnectionEventManager = new WebSocketConnectionEventManager(messagesCluster, clientEventExecutor);
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
MessagesCache messagesCache = new MessagesCache(messagesCluster, messageDeliveryScheduler,
messageDeletionAsyncExecutor, clock, dynamicConfigurationManager);
@@ -615,7 +615,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
new ClientPublicKeysManager(clientPublicKeys, accountLockManager, accountLockExecutor);
AccountsManager accountsManager = new AccountsManager(accounts, phoneNumberIdentifiers, cacheCluster,
pubsubClient, accountLockManager, keysManager, messagesManager, profilesManager,
secureStorageClient, secureValueRecovery2Client, pubSubClientEventManager,
secureStorageClient, secureValueRecovery2Client, webSocketConnectionEventManager,
registrationRecoveryPasswordsManager, clientPublicKeysManager, accountLockExecutor,
clock, config.getLinkDeviceSecretConfiguration().secret().value(), dynamicConfigurationManager);
RemoteConfigsManager remoteConfigsManager = new RemoteConfigsManager(remoteConfigs);
@@ -645,7 +645,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
new MessageDeliveryLoopMonitor(rateLimitersCluster);
final RegistrationLockVerificationManager registrationLockVerificationManager = new RegistrationLockVerificationManager(
accountsManager, pubSubClientEventManager, svr2CredentialsGenerator, svr3CredentialsGenerator,
accountsManager, webSocketConnectionEventManager, svr2CredentialsGenerator, svr3CredentialsGenerator,
registrationRecoveryPasswordsManager, pushNotificationManager, rateLimiters);
final ReportedMessageMetricsListener reportedMessageMetricsListener = new ReportedMessageMetricsListener(
@@ -720,7 +720,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
environment.lifecycle().manage(apnSender);
environment.lifecycle().manage(pushNotificationScheduler);
environment.lifecycle().manage(provisioningManager);
environment.lifecycle().manage(pubSubClientEventManager);
environment.lifecycle().manage(webSocketConnectionEventManager);
environment.lifecycle().manage(currencyManager);
environment.lifecycle().manage(registrationServiceClient);
environment.lifecycle().manage(keyTransparencyServiceClient);
@@ -972,7 +972,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
environment.jersey().register(MultiRecipientMessageProvider.class);
environment.jersey().register(new AuthDynamicFeature(accountAuthFilter));
environment.jersey().register(new AuthValueFactoryProvider.Binder<>(AuthenticatedDevice.class));
environment.jersey().register(new WebsocketRefreshApplicationEventListener(accountsManager, pubSubClientEventManager));
environment.jersey().register(new WebsocketRefreshApplicationEventListener(accountsManager,
webSocketConnectionEventManager));
environment.jersey().register(new TimestampResponseFilter());
///
@@ -982,15 +983,15 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
webSocketEnvironment.setAuthenticator(new WebSocketAccountAuthenticator(accountAuthenticator, new AccountPrincipalSupplier(accountsManager)));
webSocketEnvironment.setConnectListener(
new AuthenticatedConnectListener(receiptSender, messagesManager, messageMetrics, pushNotificationManager,
pushNotificationScheduler, pubSubClientEventManager, websocketScheduledExecutor,
pushNotificationScheduler, webSocketConnectionEventManager, websocketScheduledExecutor,
messageDeliveryScheduler, clientReleaseManager, messageDeliveryLoopMonitor));
webSocketEnvironment.jersey()
.register(new WebsocketRefreshApplicationEventListener(accountsManager, pubSubClientEventManager));
.register(new WebsocketRefreshApplicationEventListener(accountsManager, webSocketConnectionEventManager));
webSocketEnvironment.jersey().register(new RateLimitByIpFilter(rateLimiters));
webSocketEnvironment.jersey().register(new RequestStatisticsFilter(TrafficSource.WEBSOCKET));
webSocketEnvironment.jersey().register(MultiRecipientMessageProvider.class);
webSocketEnvironment.jersey().register(new MetricsApplicationEventListener(TrafficSource.WEBSOCKET, clientReleaseManager));
webSocketEnvironment.jersey().register(new KeepAliveController(pubSubClientEventManager));
webSocketEnvironment.jersey().register(new KeepAliveController(webSocketConnectionEventManager));
webSocketEnvironment.jersey().register(new TimestampResponseFilter());
final List<SpamFilter> spamFilters = ServiceLoader.load(SpamFilter.class)
@@ -1127,10 +1128,11 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
WebSocketEnvironment<AuthenticatedDevice> provisioningEnvironment = new WebSocketEnvironment<>(environment,
webSocketEnvironment.getRequestLog(), Duration.ofMillis(60000));
provisioningEnvironment.jersey().register(new WebsocketRefreshApplicationEventListener(accountsManager, pubSubClientEventManager));
provisioningEnvironment.jersey().register(new WebsocketRefreshApplicationEventListener(accountsManager,
webSocketConnectionEventManager));
provisioningEnvironment.setConnectListener(new ProvisioningConnectListener(provisioningManager));
provisioningEnvironment.jersey().register(new MetricsApplicationEventListener(TrafficSource.WEBSOCKET, clientReleaseManager));
provisioningEnvironment.jersey().register(new KeepAliveController(pubSubClientEventManager));
provisioningEnvironment.jersey().register(new KeepAliveController(webSocketConnectionEventManager));
provisioningEnvironment.jersey().register(new TimestampResponseFilter());
registerExceptionMappers(environment, webSocketEnvironment, provisioningEnvironment);

View File

@@ -26,7 +26,7 @@ import org.whispersystems.textsecuregcm.entities.Svr3Credentials;
import org.whispersystems.textsecuregcm.limits.RateLimiters;
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
import org.whispersystems.textsecuregcm.push.NotPushRegisteredException;
import org.whispersystems.textsecuregcm.push.PubSubClientEventManager;
import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager;
import org.whispersystems.textsecuregcm.push.PushNotificationManager;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
@@ -54,7 +54,7 @@ public class RegistrationLockVerificationManager {
private static final String PHONE_VERIFICATION_TYPE_TAG_NAME = "phoneVerificationType";
private final AccountsManager accounts;
private final PubSubClientEventManager pubSubClientEventManager;
private final WebSocketConnectionEventManager webSocketConnectionEventManager;
private final ExternalServiceCredentialsGenerator svr2CredentialGenerator;
private final ExternalServiceCredentialsGenerator svr3CredentialGenerator;
private final RateLimiters rateLimiters;
@@ -63,14 +63,14 @@ public class RegistrationLockVerificationManager {
public RegistrationLockVerificationManager(
final AccountsManager accounts,
final PubSubClientEventManager pubSubClientEventManager,
final WebSocketConnectionEventManager webSocketConnectionEventManager,
final ExternalServiceCredentialsGenerator svr2CredentialGenerator,
final ExternalServiceCredentialsGenerator svr3CredentialGenerator,
final RegistrationRecoveryPasswordsManager registrationRecoveryPasswordsManager,
final PushNotificationManager pushNotificationManager,
final RateLimiters rateLimiters) {
this.accounts = accounts;
this.pubSubClientEventManager = pubSubClientEventManager;
this.webSocketConnectionEventManager = webSocketConnectionEventManager;
this.svr2CredentialGenerator = svr2CredentialGenerator;
this.svr3CredentialGenerator = svr3CredentialGenerator;
this.registrationRecoveryPasswordsManager = registrationRecoveryPasswordsManager;
@@ -161,7 +161,7 @@ public class RegistrationLockVerificationManager {
}
final List<Byte> deviceIds = updatedAccount.getDevices().stream().map(Device::getId).toList();
pubSubClientEventManager.requestDisconnection(updatedAccount.getUuid(), deviceIds);
webSocketConnectionEventManager.requestDisconnection(updatedAccount.getUuid(), deviceIds);
try {
// Send a push notification that prompts the client to attempt login and fail due to locked credentials

View File

@@ -9,7 +9,7 @@ import org.glassfish.jersey.server.monitoring.ApplicationEvent;
import org.glassfish.jersey.server.monitoring.ApplicationEventListener;
import org.glassfish.jersey.server.monitoring.RequestEvent;
import org.glassfish.jersey.server.monitoring.RequestEventListener;
import org.whispersystems.textsecuregcm.push.PubSubClientEventManager;
import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
/**
@@ -20,9 +20,10 @@ public class WebsocketRefreshApplicationEventListener implements ApplicationEven
private final WebsocketRefreshRequestEventListener websocketRefreshRequestEventListener;
public WebsocketRefreshApplicationEventListener(final AccountsManager accountsManager,
final PubSubClientEventManager pubSubClientEventManager) {
final WebSocketConnectionEventManager webSocketConnectionEventManager) {
this.websocketRefreshRequestEventListener = new WebsocketRefreshRequestEventListener(pubSubClientEventManager,
this.websocketRefreshRequestEventListener = new WebsocketRefreshRequestEventListener(
webSocketConnectionEventManager,
new LinkedDeviceRefreshRequirementProvider(accountsManager),
new PhoneNumberChangeRefreshRequirementProvider(accountsManager));
}

View File

@@ -19,11 +19,11 @@ import org.glassfish.jersey.server.monitoring.RequestEvent.Type;
import org.glassfish.jersey.server.monitoring.RequestEventListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.push.PubSubClientEventManager;
import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager;
public class WebsocketRefreshRequestEventListener implements RequestEventListener {
private final PubSubClientEventManager pubSubClientEventManager;
private final WebSocketConnectionEventManager webSocketConnectionEventManager;
private final WebsocketRefreshRequirementProvider[] providers;
private static final Counter DISPLACED_ACCOUNTS = Metrics.counter(
@@ -35,10 +35,10 @@ public class WebsocketRefreshRequestEventListener implements RequestEventListene
private static final Logger logger = LoggerFactory.getLogger(WebsocketRefreshRequestEventListener.class);
public WebsocketRefreshRequestEventListener(
final PubSubClientEventManager pubSubClientEventManager,
final WebSocketConnectionEventManager webSocketConnectionEventManager,
final WebsocketRefreshRequirementProvider... providers) {
this.pubSubClientEventManager = pubSubClientEventManager;
this.webSocketConnectionEventManager = webSocketConnectionEventManager;
this.providers = providers;
}
@@ -60,7 +60,7 @@ public class WebsocketRefreshRequestEventListener implements RequestEventListene
.forEach(pair -> {
try {
displacedDevices.incrementAndGet();
pubSubClientEventManager.requestDisconnection(pair.first(), List.of(pair.second()));
webSocketConnectionEventManager.requestDisconnection(pair.first(), List.of(pair.second()));
} catch (final Exception e) {
logger.error("Could not displace device presence", e);
}

View File

@@ -22,7 +22,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.AuthenticatedDevice;
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
import org.whispersystems.textsecuregcm.push.PubSubClientEventManager;
import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager;
import org.whispersystems.websocket.auth.ReadOnly;
import org.whispersystems.websocket.session.WebSocketSession;
import org.whispersystems.websocket.session.WebSocketSessionContext;
@@ -34,14 +34,14 @@ public class KeepAliveController {
private final Logger logger = LoggerFactory.getLogger(KeepAliveController.class);
private final PubSubClientEventManager pubSubClientEventManager;
private final WebSocketConnectionEventManager webSocketConnectionEventManager;
private static final String CLOSED_CONNECTION_AGE_DISTRIBUTION_NAME = name(KeepAliveController.class,
"closedConnectionAge");
public KeepAliveController(final PubSubClientEventManager pubSubClientEventManager) {
this.pubSubClientEventManager = pubSubClientEventManager;
public KeepAliveController(final WebSocketConnectionEventManager webSocketConnectionEventManager) {
this.webSocketConnectionEventManager = webSocketConnectionEventManager;
}
@GET
@@ -49,7 +49,7 @@ public class KeepAliveController {
@WebSocketSession WebSocketSessionContext context) {
maybeAuth.ifPresent(auth -> {
if (!pubSubClientEventManager.isLocallyPresent(auth.getAccount().getUuid(), auth.getAuthenticatedDevice().getId())) {
if (!webSocketConnectionEventManager.isLocallyPresent(auth.getAccount().getUuid(), auth.getAuthenticatedDevice().getId())) {
final Duration age = Duration.between(context.getClient().getCreated(), Instant.now());

View File

@@ -6,10 +6,10 @@
package org.whispersystems.textsecuregcm.push;
/**
* A client event listener handles events related to a client's message-retrieval presence. Handler methods are run on
* dedicated threads and may safely perform blocking operations.
* A WebSocket connection event listener handles message availability and presence events related to a client's open
* WebSocket connection. Handler methods are run on dedicated threads and may safely perform blocking operations.
*/
public interface ClientEventListener {
public interface WebSocketConnectionEventListener {
/**
* Indicates that a new message is available in the connected client's message queue.

View File

@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubClusterConnection;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
@@ -39,16 +40,31 @@ import org.whispersystems.textsecuregcm.util.UUIDUtil;
import org.whispersystems.textsecuregcm.util.Util;
/**
* The pub/sub-based client presence manager uses the Redis 7 sharded pub/sub system to notify connected clients that
* new messages are available for retrieval and report to senders whether a client was present to receive a message when
* sent. This system makes a best effort to ensure that a given client has only a single open connection across the
* fleet of servers, but cannot guarantee at-most-one behavior.
* The WebSocket connection event manager distributes events related to client presence and message availability to
* registered listeners. In the current Signal server implementation, clients generally interact with the service by
* opening a dual-purpose WebSocket. The WebSocket serves as both a delivery mechanism for messages and as a channel
* for the client to issue API requests to the server. Clients are considered "present" if they have an open WebSocket
* connection and are therefore likely to receive messages as soon as they're delivered to the server. WebSocket
* connection managers make a best effort to ensure that clients have at most one active message delivery channel at
* a time.
*
* @implNote The WebSocket connection event manager uses the Redis 7 sharded pub/sub system to distribute events. This
* system makes a best effort to ensure that a given client has only a single open connection across the fleet of
* servers, but cannot guarantee at-most-one behavior.
*
* @see WebSocketConnectionEventListener
* @see org.whispersystems.textsecuregcm.storage.MessagesManager#insert(UUID, byte, MessageProtos.Envelope)
*/
public class PubSubClientEventManager extends RedisClusterPubSubAdapter<byte[], byte[]> implements Managed {
public class WebSocketConnectionEventManager extends RedisClusterPubSubAdapter<byte[], byte[]> implements Managed {
private final FaultTolerantRedisClusterClient clusterClient;
private final Executor listenerEventExecutor;
@Nullable
private FaultTolerantPubSubClusterConnection<byte[], byte[]> pubSubConnection;
private final Map<AccountAndDeviceIdentifier, WebSocketConnectionEventListener> listenersByAccountAndDeviceIdentifier;
private final UUID serverId = UUID.randomUUID();
private final byte[] CLIENT_CONNECTED_EVENT_BYTES = ClientEvent.newBuilder()
@@ -58,36 +74,31 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter<byte[],
.build()
.toByteArray();
@Nullable
private FaultTolerantPubSubClusterConnection<byte[], byte[]> pubSubConnection;
private final Map<AccountAndDeviceIdentifier, ClientEventListener> listenersByAccountAndDeviceIdentifier;
private static final byte[] DISCONNECT_REQUESTED_EVENT_BYTES = ClientEvent.newBuilder()
.setDisconnectRequested(DisconnectRequested.getDefaultInstance())
.build()
.toByteArray();
private static final Counter PUBLISH_CLIENT_CONNECTION_EVENT_ERROR_COUNTER =
Metrics.counter(MetricsUtil.name(PubSubClientEventManager.class, "publishClientConnectionEventError"));
Metrics.counter(MetricsUtil.name(WebSocketConnectionEventManager.class, "publishClientConnectionEventError"));
private static final Counter UNSUBSCRIBE_ERROR_COUNTER =
Metrics.counter(MetricsUtil.name(PubSubClientEventManager.class, "unsubscribeError"));
Metrics.counter(MetricsUtil.name(WebSocketConnectionEventManager.class, "unsubscribeError"));
private static final Counter MESSAGE_WITHOUT_LISTENER_COUNTER =
Metrics.counter(MetricsUtil.name(PubSubClientEventManager.class, "messageWithoutListener"));
Metrics.counter(MetricsUtil.name(WebSocketConnectionEventManager.class, "messageWithoutListener"));
private static final String LISTENER_GAUGE_NAME =
MetricsUtil.name(PubSubClientEventManager.class, "listeners");
MetricsUtil.name(WebSocketConnectionEventManager.class, "listeners");
private static final Logger logger = LoggerFactory.getLogger(PubSubClientEventManager.class);
private static final Logger logger = LoggerFactory.getLogger(WebSocketConnectionEventManager.class);
@VisibleForTesting
record AccountAndDeviceIdentifier(UUID accountIdentifier, byte deviceId) {
}
public PubSubClientEventManager(final FaultTolerantRedisClusterClient clusterClient,
final Executor listenerEventExecutor) {
public WebSocketConnectionEventManager(final FaultTolerantRedisClusterClient clusterClient,
final Executor listenerEventExecutor) {
this.clusterClient = clusterClient;
this.listenerEventExecutor = listenerEventExecutor;
@@ -117,27 +128,26 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter<byte[],
}
/**
* Marks the given device as "present" and registers a listener for new messages and conflicting connections. If the
* given device already has a presence registered with this presence manager instance, that presence is displaced
* immediately and the listener's {@link ClientEventListener#handleConnectionDisplaced(boolean)} method is called.
* Marks the given device as "present" for message delivery and registers a listener for new messages and conflicting
* connections. If the given device already has a presence registered with this manager, that presence is displaced
* immediately and the listener's {@link WebSocketConnectionEventListener#handleConnectionDisplaced(boolean)} method is called.
*
* @param accountIdentifier the account identifier for the newly-connected device
* @param deviceId the ID of the newly-connected device within the given account
* @param listener the listener to notify when new messages or conflicting connections arrive for the newly-conencted
* @param listener the listener to notify when new messages or conflicting connections arrive for the newly-connected
* device
*
* @return a future that yields a connection identifier when the new device's presence has been registered; the future
* may fail if a pub/sub subscription could not be established, in which case callers should close the client's
* connection to the server
* @return a future that completes when the new device's presence has been registered; the future may fail if a
* pub/sub subscription could not be established, in which case callers should close the client's connection to the
* server
*/
public CompletionStage<UUID> handleClientConnected(final UUID accountIdentifier, final byte deviceId, final ClientEventListener listener) {
public CompletionStage<Void> handleClientConnected(final UUID accountIdentifier, final byte deviceId, final WebSocketConnectionEventListener listener) {
if (pubSubConnection == null) {
throw new IllegalStateException("Presence manager not started");
throw new IllegalStateException("WebSocket connection event manager not started");
}
final UUID connectionId = UUID.randomUUID();
final byte[] clientPresenceKey = getClientEventChannel(accountIdentifier, deviceId);
final AtomicReference<ClientEventListener> displacedListener = new AtomicReference<>();
final byte[] eventChannel = getClientEventChannel(accountIdentifier, deviceId);
final AtomicReference<WebSocketConnectionEventListener> displacedListener = new AtomicReference<>();
final AtomicReference<CompletionStage<Void>> subscribeFuture = new AtomicReference<>();
// Note that we're relying on some specific implementation details of `ConcurrentHashMap#compute(...)`. In
@@ -153,7 +163,7 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter<byte[],
listenersByAccountAndDeviceIdentifier.compute(new AccountAndDeviceIdentifier(accountIdentifier, deviceId),
(key, existingListener) -> {
subscribeFuture.set(pubSubConnection.withPubSubConnection(connection ->
connection.async().ssubscribe(clientPresenceKey)));
connection.async().ssubscribe(eventChannel)));
if (existingListener != null) {
displacedListener.set(existingListener);
@@ -168,28 +178,28 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter<byte[],
return subscribeFuture.get()
.thenCompose(ignored -> clusterClient.withBinaryCluster(connection -> connection.async()
.spublish(clientPresenceKey, CLIENT_CONNECTED_EVENT_BYTES)))
.spublish(eventChannel, CLIENT_CONNECTED_EVENT_BYTES)))
.handle((ignored, throwable) -> {
if (throwable != null) {
PUBLISH_CLIENT_CONNECTION_EVENT_ERROR_COUNTER.increment();
}
return connectionId;
return null;
});
}
/**
* Removes the "presence" for the given device. Callers should call this method when they have been notified that
* the client's underlying network connection has been closed.
* Removes the "presence" and event listener for the given device. Callers should call this method when the client's
* underlying network connection has closed.
*
* @param accountIdentifier the identifier of the account for the disconnected device
* @param deviceId the ID of the disconnected device within the given account
*
* @return a future that completes when the presence has been removed
* @return a future that completes when the presence and event listener have been removed
*/
public CompletionStage<Void> handleClientDisconnected(final UUID accountIdentifier, final byte deviceId) {
if (pubSubConnection == null) {
throw new IllegalStateException("Presence manager not started");
throw new IllegalStateException("WebSocket connection event manager not started");
}
final AtomicReference<CompletionStage<Void>> unsubscribeFuture = new AtomicReference<>();
@@ -221,20 +231,20 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter<byte[],
}
/**
* Tests whether a client with the given account/device is connected to this presence manager instance.
* Tests whether a client with the given account/device is connected to this manager instance.
*
* @param accountUuid the account identifier for the client to check
* @param deviceId the ID of the device within the given account
*
* @return {@code true} if a client with the given account/device is connected to this presence manager instance or
* {@code false} if the client is not connected at all or is connected to a different presence manager instance
* @return {@code true} if a client with the given account/device is connected to this manager instance or
* {@code false} if the client is not connected at all or is connected to a different manager instance
*/
public boolean isLocallyPresent(final UUID accountUuid, final byte deviceId) {
return listenersByAccountAndDeviceIdentifier.containsKey(new AccountAndDeviceIdentifier(accountUuid, deviceId));
}
/**
* Broadcasts a request that all devices associated with the identified account and connected to any client presence
* Broadcasts a request that all devices associated with the identified account and connected to any event manager
* instance close their network connections.
*
* @param accountIdentifier the account identifier for which to request disconnection
@@ -246,8 +256,8 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter<byte[],
}
/**
* Broadcasts a request that the specified devices associated with the identified account and connected to any client
* presence instance close their network connections.
* Broadcasts a request that the specified devices associated with the identified account and connected to any event
* manager instance close their network connections.
*
* @param accountIdentifier the account identifier for which to request disconnection
* @param deviceIds the IDs of the devices for which to request disconnection
@@ -256,13 +266,9 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter<byte[],
*/
public CompletableFuture<Void> requestDisconnection(final UUID accountIdentifier, final Collection<Byte> deviceIds) {
return CompletableFuture.allOf(deviceIds.stream()
.map(deviceId -> {
final byte[] clientPresenceKey = getClientEventChannel(accountIdentifier, deviceId);
return clusterClient.withBinaryCluster(connection -> connection.async()
.spublish(clientPresenceKey, DISCONNECT_REQUESTED_EVENT_BYTES))
.toCompletableFuture();
})
.map(deviceId -> clusterClient.withBinaryCluster(connection -> connection.async()
.spublish(getClientEventChannel(accountIdentifier, deviceId), DISCONNECT_REQUESTED_EVENT_BYTES))
.toCompletableFuture())
.toArray(CompletableFuture[]::new));
}
@@ -270,7 +276,7 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter<byte[],
void resubscribe(final ClusterTopologyChangedEvent clusterTopologyChangedEvent) {
final boolean[] changedSlots = RedisClusterUtil.getChangedSlots(clusterTopologyChangedEvent);
final Map<Integer, List<byte[]>> clientPresenceKeysBySlot = new HashMap<>();
final Map<Integer, List<byte[]>> eventChannelsBySlot = new HashMap<>();
// Organize subscriptions by slot so we can issue a smaller number of larger resubscription commands
listenersByAccountAndDeviceIdentifier.keySet()
@@ -280,15 +286,15 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter<byte[],
final int slot = SlotHash.getSlot(clientEventChannel);
if (changedSlots[slot]) {
clientPresenceKeysBySlot.computeIfAbsent(slot, ignored -> new ArrayList<>()).add(clientEventChannel);
eventChannelsBySlot.computeIfAbsent(slot, ignored -> new ArrayList<>()).add(clientEventChannel);
}
});
// Issue one resubscription command per affected slot
clientPresenceKeysBySlot.forEach((slot, clientPresenceKeys) -> {
eventChannelsBySlot.forEach((slot, eventChannels) -> {
if (pubSubConnection != null) {
final byte[][] clientPresenceKeyArray = clientPresenceKeys.toArray(byte[][]::new);
pubSubConnection.usePubSubConnection(connection -> connection.sync().ssubscribe(clientPresenceKeyArray));
pubSubConnection.usePubSubConnection(connection ->
connection.sync().ssubscribe(eventChannels.toArray(byte[][]::new)));
}
});
}
@@ -324,9 +330,9 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter<byte[],
return;
}
final AccountAndDeviceIdentifier accountAndDeviceIdentifier = parseClientPresenceKey(shardChannel);
final AccountAndDeviceIdentifier accountAndDeviceIdentifier = parseClientEventChannel(shardChannel);
@Nullable final ClientEventListener listener =
@Nullable final WebSocketConnectionEventListener listener =
listenersByAccountAndDeviceIdentifier.get(accountAndDeviceIdentifier);
if (listener != null) {
@@ -334,7 +340,7 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter<byte[],
case NEW_MESSAGE_AVAILABLE -> listener.handleNewMessageAvailable();
case CLIENT_CONNECTED -> {
// Only act on new connections to other presence manager instances; we'll learn about displacements in THIS
// Only act on new connections to other event manager instances; we'll learn about displacements in THIS
// instance when we update the listener map in `handleClientConnected`
if (!this.serverId.equals(UUIDUtil.fromByteString(clientEvent.getClientConnected().getServerId()))) {
listenerEventExecutor.execute(() -> listener.handleConnectionDisplaced(true));
@@ -357,12 +363,12 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter<byte[],
return ("client_presence::{" + accountIdentifier + "::" + deviceId + "}").getBytes(StandardCharsets.UTF_8);
}
private static AccountAndDeviceIdentifier parseClientPresenceKey(final byte[] clientPresenceKeyBytes) {
final String clientPresenceKey = new String(clientPresenceKeyBytes, StandardCharsets.UTF_8);
private static AccountAndDeviceIdentifier parseClientEventChannel(final byte[] eventChannelBytes) {
final String eventChannel = new String(eventChannelBytes, StandardCharsets.UTF_8);
final int uuidStart = "client_presence::{".length();
final UUID accountIdentifier = UUID.fromString(clientPresenceKey.substring(uuidStart, uuidStart + 36));
final byte deviceId = Byte.parseByte(clientPresenceKey.substring(uuidStart + 38, clientPresenceKey.length() - 1));
final UUID accountIdentifier = UUID.fromString(eventChannel.substring(uuidStart, uuidStart + 36));
final byte deviceId = Byte.parseByte(eventChannel.substring(uuidStart + 38, eventChannel.length() - 1));
return new AccountAndDeviceIdentifier(accountIdentifier, deviceId);
}

View File

@@ -75,7 +75,7 @@ import org.whispersystems.textsecuregcm.entities.RestoreAccountRequest;
import org.whispersystems.textsecuregcm.identity.IdentityType;
import org.whispersystems.textsecuregcm.identity.ServiceIdentifier;
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
import org.whispersystems.textsecuregcm.push.PubSubClientEventManager;
import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager;
import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubConnection;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClient;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
@@ -124,7 +124,7 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
private final ProfilesManager profilesManager;
private final SecureStorageClient secureStorageClient;
private final SecureValueRecovery2Client secureValueRecovery2Client;
private final PubSubClientEventManager pubSubClientEventManager;
private final WebSocketConnectionEventManager webSocketConnectionEventManager;
private final RegistrationRecoveryPasswordsManager registrationRecoveryPasswordsManager;
private final ClientPublicKeysManager clientPublicKeysManager;
private final Executor accountLockExecutor;
@@ -202,7 +202,7 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
final ProfilesManager profilesManager,
final SecureStorageClient secureStorageClient,
final SecureValueRecovery2Client secureValueRecovery2Client,
final PubSubClientEventManager pubSubClientEventManager,
final WebSocketConnectionEventManager webSocketConnectionEventManager,
final RegistrationRecoveryPasswordsManager registrationRecoveryPasswordsManager,
final ClientPublicKeysManager clientPublicKeysManager,
final Executor accountLockExecutor,
@@ -219,7 +219,7 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
this.profilesManager = profilesManager;
this.secureStorageClient = secureStorageClient;
this.secureValueRecovery2Client = secureValueRecovery2Client;
this.pubSubClientEventManager = pubSubClientEventManager;
this.webSocketConnectionEventManager = webSocketConnectionEventManager;
this.registrationRecoveryPasswordsManager = requireNonNull(registrationRecoveryPasswordsManager);
this.clientPublicKeysManager = clientPublicKeysManager;
this.accountLockExecutor = accountLockExecutor;
@@ -325,7 +325,7 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
keysManager.deleteSingleUsePreKeys(pni),
messagesManager.clear(aci),
profilesManager.deleteAll(aci))
.thenCompose(ignored -> pubSubClientEventManager.requestDisconnection(aci))
.thenCompose(ignored -> webSocketConnectionEventManager.requestDisconnection(aci))
.thenCompose(ignored -> accounts.reclaimAccount(e.getExistingAccount(), account, additionalWriteItems))
.thenCompose(ignored -> {
// We should have cleared all messages before overwriting the old account, but more may have arrived
@@ -589,7 +589,7 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
})
.whenComplete((ignored, throwable) -> {
if (throwable == null) {
pubSubClientEventManager.requestDisconnection(accountIdentifier, List.of(deviceId));
webSocketConnectionEventManager.requestDisconnection(accountIdentifier, List.of(deviceId));
}
});
}
@@ -1236,7 +1236,7 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
registrationRecoveryPasswordsManager.removeForNumber(account.getNumber()))
.thenCompose(ignored -> accounts.delete(account.getUuid(), additionalWriteItems))
.thenCompose(ignored -> redisDeleteAsync(account))
.thenRun(() -> pubSubClientEventManager.requestDisconnection(account.getUuid()));
.thenRun(() -> webSocketConnectionEventManager.requestDisconnection(account.getUuid()));
}
private String getAccountMapKey(String key) {

View File

@@ -15,7 +15,7 @@ import java.util.UUID;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.push.ClientEvent;
import org.whispersystems.textsecuregcm.push.NewMessageAvailableEvent;
import org.whispersystems.textsecuregcm.push.PubSubClientEventManager;
import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
@@ -52,7 +52,7 @@ class MessagesCacheInsertScript {
MessagesCache.getMessageQueueKey(destinationUuid, destinationDevice), // queueKey
MessagesCache.getMessageQueueMetadataKey(destinationUuid, destinationDevice), // queueMetadataKey
MessagesCache.getQueueIndexKey(destinationUuid, destinationDevice), // queueTotalIndexKey
PubSubClientEventManager.getClientEventChannel(destinationUuid, destinationDevice) // eventChannelKey
WebSocketConnectionEventManager.getClientEventChannel(destinationUuid, destinationDevice) // eventChannelKey
);
final List<byte[]> args = new ArrayList<>(Arrays.asList(

View File

@@ -8,7 +8,7 @@ package org.whispersystems.textsecuregcm.storage;
import io.lettuce.core.ScriptOutputType;
import org.whispersystems.textsecuregcm.push.ClientEvent;
import org.whispersystems.textsecuregcm.push.MessagesPersistedEvent;
import org.whispersystems.textsecuregcm.push.PubSubClientEventManager;
import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
import java.io.IOException;
@@ -35,7 +35,7 @@ class MessagesCacheUnlockQueueScript {
void execute(final UUID accountIdentifier, final byte deviceId) {
final List<byte[]> keys = List.of(
MessagesCache.getPersistInProgressKey(accountIdentifier, deviceId), // persistInProgressKey
PubSubClientEventManager.getClientEventChannel(accountIdentifier, deviceId) // eventChannelKey
WebSocketConnectionEventManager.getClientEventChannel(accountIdentifier, deviceId) // eventChannelKey
);
unlockQueueScript.executeBinary(keys, MESSAGES_PERSISTED_EVENT_ARGS);

View File

@@ -59,10 +59,23 @@ public class MessagesManager {
this.messageDeletionExecutor = messageDeletionExecutor;
}
public boolean insert(UUID destinationUuid, byte destinationDevice, Envelope message) {
/**
* Inserts a message into a target device's message queue and notifies registered listeners that a new message is
* available.
*
* @param destinationUuid the account identifier for the destination queue
* @param destinationDeviceId the device ID for the destination queue
* @param message the message to insert into the queue
*
* @return {@code true} if the destination device is "present" (i.e. has an active event listener) or {@code false}
* otherwise
*
* @see org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager
*/
public boolean insert(final UUID destinationUuid, final byte destinationDeviceId, final Envelope message) {
final UUID messageGuid = UUID.randomUUID();
final boolean destinationPresent = messagesCache.insert(messageGuid, destinationUuid, destinationDevice, message);
final boolean destinationPresent = messagesCache.insert(messageGuid, destinationUuid, destinationDeviceId, message);
if (message.hasSourceServiceId() && !destinationUuid.toString().equals(message.getSourceServiceId())) {
reportMessageManager.store(message.getSourceServiceId(), messageGuid);

View File

@@ -15,7 +15,7 @@ import org.whispersystems.textsecuregcm.auth.AuthenticatedDevice;
import org.whispersystems.textsecuregcm.limits.MessageDeliveryLoopMonitor;
import org.whispersystems.textsecuregcm.metrics.MessageMetrics;
import org.whispersystems.textsecuregcm.metrics.OpenWebSocketCounter;
import org.whispersystems.textsecuregcm.push.PubSubClientEventManager;
import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager;
import org.whispersystems.textsecuregcm.push.PushNotificationManager;
import org.whispersystems.textsecuregcm.push.PushNotificationScheduler;
import org.whispersystems.textsecuregcm.push.ReceiptSender;
@@ -40,7 +40,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
private final MessageMetrics messageMetrics;
private final PushNotificationManager pushNotificationManager;
private final PushNotificationScheduler pushNotificationScheduler;
private final PubSubClientEventManager pubSubClientEventManager;
private final WebSocketConnectionEventManager webSocketConnectionEventManager;
private final ScheduledExecutorService scheduledExecutorService;
private final Scheduler messageDeliveryScheduler;
private final ClientReleaseManager clientReleaseManager;
@@ -54,7 +54,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
MessageMetrics messageMetrics,
PushNotificationManager pushNotificationManager,
PushNotificationScheduler pushNotificationScheduler,
PubSubClientEventManager pubSubClientEventManager,
WebSocketConnectionEventManager webSocketConnectionEventManager,
ScheduledExecutorService scheduledExecutorService,
Scheduler messageDeliveryScheduler,
ClientReleaseManager clientReleaseManager,
@@ -64,7 +64,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
this.messageMetrics = messageMetrics;
this.pushNotificationManager = pushNotificationManager;
this.pushNotificationScheduler = pushNotificationScheduler;
this.pubSubClientEventManager = pubSubClientEventManager;
this.webSocketConnectionEventManager = webSocketConnectionEventManager;
this.scheduledExecutorService = scheduledExecutorService;
this.messageDeliveryScheduler = messageDeliveryScheduler;
this.clientReleaseManager = clientReleaseManager;
@@ -105,7 +105,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
// receive push notifications for inbound messages. We should do this first because, at this point, the
// connection has already closed and attempts to actually deliver a message via the connection will not succeed.
// It's preferable to start sending push notifications as soon as possible.
pubSubClientEventManager.handleClientDisconnected(auth.getAccount().getUuid(),
webSocketConnectionEventManager.handleClientDisconnected(auth.getAccount().getUuid(),
auth.getAuthenticatedDevice().getId());
// Finally, stop trying to deliver messages and send a push notification if the connection is aware of any
@@ -122,7 +122,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
// Finally, we register this client's presence, which suppresses push notifications. We do this last because
// receiving extra push notifications is generally preferable to missing out on a push notification.
pubSubClientEventManager.handleClientConnected(auth.getAccount().getUuid(), auth.getAuthenticatedDevice().getId(), connection);
webSocketConnectionEventManager.handleClientConnected(auth.getAccount().getUuid(), auth.getAuthenticatedDevice().getId(), connection);
} catch (final Exception e) {
log.warn("Failed to initialize websocket", e);
context.getClient().close(1011, "Unexpected error initializing connection");

View File

@@ -45,7 +45,7 @@ import org.whispersystems.textsecuregcm.limits.MessageDeliveryLoopMonitor;
import org.whispersystems.textsecuregcm.metrics.MessageMetrics;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
import org.whispersystems.textsecuregcm.push.ClientEventListener;
import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventListener;
import org.whispersystems.textsecuregcm.push.PushNotificationManager;
import org.whispersystems.textsecuregcm.push.PushNotificationScheduler;
import org.whispersystems.textsecuregcm.push.ReceiptSender;
@@ -63,7 +63,7 @@ import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import javax.annotation.Nullable;
public class WebSocketConnection implements ClientEventListener {
public class WebSocketConnection implements WebSocketConnectionEventListener {
private static final DistributionSummary messageTime = Metrics.summary(
name(MessageController.class, "messageDeliveryDuration"));

View File

@@ -36,7 +36,7 @@ import org.whispersystems.textsecuregcm.limits.RateLimiters;
import org.whispersystems.textsecuregcm.metrics.MicrometerAwsSdkMetricPublisher;
import org.whispersystems.textsecuregcm.push.APNSender;
import org.whispersystems.textsecuregcm.push.FcmSender;
import org.whispersystems.textsecuregcm.push.PubSubClientEventManager;
import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager;
import org.whispersystems.textsecuregcm.push.PushNotificationManager;
import org.whispersystems.textsecuregcm.push.PushNotificationScheduler;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClient;
@@ -205,7 +205,7 @@ record CommandDependencies(
configuration.getSvr2Configuration());
SecureStorageClient secureStorageClient = new SecureStorageClient(storageCredentialsGenerator,
storageServiceExecutor, storageServiceRetryExecutor, configuration.getSecureStorageServiceConfiguration());
PubSubClientEventManager pubSubClientEventManager = new PubSubClientEventManager(messagesCluster, clientEventExecutor);
WebSocketConnectionEventManager webSocketConnectionEventManager = new WebSocketConnectionEventManager(messagesCluster, clientEventExecutor);
MessagesCache messagesCache = new MessagesCache(messagesCluster,
messageDeliveryScheduler, messageDeletionExecutor, Clock.systemUTC(), dynamicConfigurationManager);
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
@@ -222,7 +222,7 @@ record CommandDependencies(
new ClientPublicKeysManager(clientPublicKeys, accountLockManager, accountLockExecutor);
AccountsManager accountsManager = new AccountsManager(accounts, phoneNumberIdentifiers, cacheCluster,
pubsubClient, accountLockManager, keys, messagesManager, profilesManager,
secureStorageClient, secureValueRecovery2Client, pubSubClientEventManager,
secureStorageClient, secureValueRecovery2Client, webSocketConnectionEventManager,
registrationRecoveryPasswordsManager, clientPublicKeysManager, accountLockExecutor,
clock, configuration.getLinkDeviceSecretConfiguration().secret().value(), dynamicConfigurationManager);
RateLimiters rateLimiters = RateLimiters.createAndValidate(configuration.getLimitsConfiguration(),
@@ -259,7 +259,7 @@ record CommandDependencies(
Clock.systemUTC());
environment.lifecycle().manage(apnSender);
environment.lifecycle().manage(pubSubClientEventManager);
environment.lifecycle().manage(webSocketConnectionEventManager);
environment.lifecycle().manage(new ManagedAwsCrt());
return new CommandDependencies(