Scope disconnection request listeners to a single connection

This commit is contained in:
Jon Chambers
2025-07-23 12:13:04 -04:00
committed by Jon Chambers
parent 541c87e262
commit cf222e1105
13 changed files with 208 additions and 207 deletions

View File

@@ -22,11 +22,8 @@ public interface WebSocketConnectionEventListener {
void handleMessagesPersisted();
/**
* Indicates that the client's presence has been displaced and the listener should close the client's underlying
* network connection.
*
* @param connectedElsewhere if {@code true}, indicates that the client's presence has been displaced by another
* connection from the same client
* Indicates a newer instance of this client has started reading messages and the listener should close this client's
* underlying network connection.
*/
void handleConnectionDisplaced(boolean connectedElsewhere);
void handleConflictingMessageReader();
}

View File

@@ -17,11 +17,9 @@ import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tags;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
@@ -32,8 +30,6 @@ import java.util.function.Function;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.DisconnectionRequestListener;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubClusterConnection;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
@@ -56,10 +52,9 @@ import org.whispersystems.textsecuregcm.util.Util;
* servers, but cannot guarantee at-most-one behavior.
*
* @see WebSocketConnectionEventListener
* @see org.whispersystems.textsecuregcm.storage.MessagesManager#insert(UUID, byte, MessageProtos.Envelope)
* @see org.whispersystems.textsecuregcm.storage.MessagesManager#insert(UUID, Map)
*/
public class WebSocketConnectionEventManager extends RedisClusterPubSubAdapter<byte[], byte[]> implements Managed,
DisconnectionRequestListener {
public class WebSocketConnectionEventManager extends RedisClusterPubSubAdapter<byte[], byte[]> implements Managed {
private final AccountsManager accountsManager;
private final PushNotificationManager pushNotificationManager;
@@ -145,7 +140,7 @@ public class WebSocketConnectionEventManager extends RedisClusterPubSubAdapter<b
/**
* Marks the given device as "present" for message delivery and registers a listener for new messages and conflicting
* connections. If the given device already has a presence registered with this manager, that presence is displaced
* immediately and the listener's {@link WebSocketConnectionEventListener#handleConnectionDisplaced(boolean)} method is called.
* immediately and the listener's {@link WebSocketConnectionEventListener#handleConflictingMessageReader()} method is called.
*
* @param accountIdentifier the account identifier for the newly-connected device
* @param deviceId the ID of the newly-connected device within the given account
@@ -189,7 +184,7 @@ public class WebSocketConnectionEventManager extends RedisClusterPubSubAdapter<b
});
if (displacedListener.get() != null) {
listenerEventExecutor.execute(() -> displacedListener.get().handleConnectionDisplaced(true));
listenerEventExecutor.execute(() -> displacedListener.get().handleConflictingMessageReader());
}
return subscribeFuture.get()
@@ -260,14 +255,6 @@ public class WebSocketConnectionEventManager extends RedisClusterPubSubAdapter<b
return listenersByAccountAndDeviceIdentifier.containsKey(new AccountAndDeviceIdentifier(accountUuid, deviceId));
}
@Override
public void handleDisconnectionRequest(final UUID accountIdentifier, final Collection<Byte> deviceIds) {
deviceIds.stream()
.map(deviceId -> listenersByAccountAndDeviceIdentifier.get(new AccountAndDeviceIdentifier(accountIdentifier, deviceId)))
.filter(Objects::nonNull)
.forEach(listener -> listener.handleConnectionDisplaced(false));
}
@VisibleForTesting
void resubscribe(final ClusterTopologyChangedEvent clusterTopologyChangedEvent) {
final boolean[] changedSlots = RedisClusterUtil.getChangedSlots(clusterTopologyChangedEvent);
@@ -339,7 +326,7 @@ public class WebSocketConnectionEventManager extends RedisClusterPubSubAdapter<b
// Only act on new connections to other event manager instances; we'll learn about displacements in THIS
// instance when we update the listener map in `handleClientConnected`
if (!this.serverId.equals(UUIDUtil.fromByteString(clientEvent.getClientConnected().getServerId()))) {
listenerEventExecutor.execute(() -> listener.handleConnectionDisplaced(true));
listenerEventExecutor.execute(listener::handleConflictingMessageReader);
}
}