Distinguish local vs remote in ClientPresenceManager#disconnectPresence

This commit is contained in:
Chris Eager
2021-12-02 15:32:42 -07:00
committed by GitHub
parent e507ce2f26
commit 13e346d4eb
8 changed files with 388 additions and 293 deletions

View File

@@ -5,236 +5,314 @@
package org.whispersystems.textsecuregcm.push;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent;
import java.time.Duration;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.whispersystems.textsecuregcm.redis.AbstractRedisClusterTest;
import java.util.function.Function;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.whispersystems.textsecuregcm.redis.RedisClusterExtension;
public class ClientPresenceManagerTest extends AbstractRedisClusterTest {
class ClientPresenceManagerTest {
private ScheduledExecutorService presenceRenewalExecutorService;
private ClientPresenceManager clientPresenceManager;
@RegisterExtension
static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build();
private static final DisplacedPresenceListener NO_OP = () -> {};
private ScheduledExecutorService presenceRenewalExecutorService;
private ClientPresenceManager clientPresenceManager;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
private static final DisplacedPresenceListener NO_OP = () -> {
};
getRedisCluster().useCluster(connection -> {
connection.sync().flushall();
connection.sync().upstream().commands().configSet("notify-keyspace-events", "K$z");
});
@BeforeEach
void setUp() throws Exception {
presenceRenewalExecutorService = Executors.newSingleThreadScheduledExecutor();
clientPresenceManager = new ClientPresenceManager(getRedisCluster(), presenceRenewalExecutorService, presenceRenewalExecutorService);
REDIS_CLUSTER_EXTENSION.getRedisCluster().useCluster(connection -> {
connection.sync().flushall();
connection.sync().upstream().commands().configSet("notify-keyspace-events", "K$glz");
});
presenceRenewalExecutorService = Executors.newSingleThreadScheduledExecutor();
clientPresenceManager = new ClientPresenceManager(REDIS_CLUSTER_EXTENSION.getRedisCluster(),
presenceRenewalExecutorService,
presenceRenewalExecutorService);
}
@AfterEach
public void tearDown() throws Exception {
presenceRenewalExecutorService.shutdown();
presenceRenewalExecutorService.awaitTermination(1, TimeUnit.MINUTES);
clientPresenceManager.stop();
}
@Test
void testIsPresent() {
final UUID accountUuid = UUID.randomUUID();
final long deviceId = 1;
assertFalse(clientPresenceManager.isPresent(accountUuid, deviceId));
clientPresenceManager.setPresent(accountUuid, deviceId, NO_OP);
assertTrue(clientPresenceManager.isPresent(accountUuid, deviceId));
}
@Test
void testIsLocallyPresent() {
final UUID accountUuid = UUID.randomUUID();
final long deviceId = 1;
assertFalse(clientPresenceManager.isLocallyPresent(accountUuid, deviceId));
clientPresenceManager.setPresent(accountUuid, deviceId, NO_OP);
REDIS_CLUSTER_EXTENSION.getRedisCluster().useCluster(connection -> connection.sync().flushall());
assertTrue(clientPresenceManager.isLocallyPresent(accountUuid, deviceId));
}
@Test
void testLocalDisplacement() {
final UUID accountUuid = UUID.randomUUID();
final long deviceId = 1;
final AtomicInteger displacementCounter = new AtomicInteger(0);
final DisplacedPresenceListener displacementListener = displacementCounter::incrementAndGet;
clientPresenceManager.setPresent(accountUuid, deviceId, displacementListener);
assertEquals(0, displacementCounter.get());
clientPresenceManager.setPresent(accountUuid, deviceId, displacementListener);
assertEquals(1, displacementCounter.get());
}
@Test
void testRemoteDisplacement() {
final UUID accountUuid = UUID.randomUUID();
final long deviceId = 1;
final CompletableFuture<?> displaced = new CompletableFuture<>();
clientPresenceManager.start();
clientPresenceManager.setPresent(accountUuid, deviceId, () -> displaced.complete(null));
REDIS_CLUSTER_EXTENSION.getRedisCluster().useCluster(
connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(accountUuid, deviceId),
UUID.randomUUID().toString()));
assertTimeoutPreemptively(Duration.ofSeconds(10), displaced::join);
}
@Test
void testRemoteDisplacementAfterTopologyChange() {
final UUID accountUuid = UUID.randomUUID();
final long deviceId = 1;
final CompletableFuture<?> displaced = new CompletableFuture<>();
clientPresenceManager.start();
clientPresenceManager.setPresent(accountUuid, deviceId, () -> displaced.complete(null));
clientPresenceManager.getPubSubConnection()
.usePubSubConnection(connection -> connection.getResources().eventBus()
.publish(new ClusterTopologyChangedEvent(List.of(), List.of())));
REDIS_CLUSTER_EXTENSION.getRedisCluster().useCluster(
connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(accountUuid, deviceId),
UUID.randomUUID().toString()));
assertTimeoutPreemptively(Duration.ofSeconds(10), displaced::join);
}
@Test
void testClearPresence() {
final UUID accountUuid = UUID.randomUUID();
final long deviceId = 1;
assertFalse(clientPresenceManager.isPresent(accountUuid, deviceId));
clientPresenceManager.setPresent(accountUuid, deviceId, NO_OP);
assertTrue(clientPresenceManager.clearPresence(accountUuid, deviceId));
clientPresenceManager.setPresent(accountUuid, deviceId, NO_OP);
REDIS_CLUSTER_EXTENSION.getRedisCluster().useCluster(
connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(accountUuid, deviceId),
UUID.randomUUID().toString()));
assertFalse(clientPresenceManager.clearPresence(accountUuid, deviceId));
}
@Test
void testPruneMissingPeers() {
final String presentPeerId = UUID.randomUUID().toString();
final String missingPeerId = UUID.randomUUID().toString();
REDIS_CLUSTER_EXTENSION.getRedisCluster().useCluster(connection -> {
connection.sync().sadd(ClientPresenceManager.MANAGER_SET_KEY, presentPeerId);
connection.sync().sadd(ClientPresenceManager.MANAGER_SET_KEY, missingPeerId);
});
for (int i = 0; i < 10; i++) {
addClientPresence(presentPeerId);
addClientPresence(missingPeerId);
}
@Override
@After
public void tearDown() throws Exception {
super.tearDown();
clientPresenceManager.getPubSubConnection().usePubSubConnection(
connection -> connection.sync().upstream().commands()
.subscribe(ClientPresenceManager.getManagerPresenceChannel(presentPeerId)));
clientPresenceManager.pruneMissingPeers();
presenceRenewalExecutorService.shutdown();
presenceRenewalExecutorService.awaitTermination(1, TimeUnit.MINUTES);
assertEquals(1, (long) REDIS_CLUSTER_EXTENSION.getRedisCluster().withCluster(
connection -> connection.sync().exists(ClientPresenceManager.getConnectedClientSetKey(presentPeerId))));
assertTrue(REDIS_CLUSTER_EXTENSION.getRedisCluster().withCluster(
(Function<StatefulRedisClusterConnection<String, String>, Boolean>) connection -> connection.sync()
.sismember(ClientPresenceManager.MANAGER_SET_KEY, presentPeerId)));
assertEquals(0, (long) REDIS_CLUSTER_EXTENSION.getRedisCluster().withCluster(
connection -> connection.sync().exists(ClientPresenceManager.getConnectedClientSetKey(missingPeerId))));
assertFalse(REDIS_CLUSTER_EXTENSION.getRedisCluster().withCluster(
(Function<StatefulRedisClusterConnection<String, String>, Boolean>) connection -> connection.sync()
.sismember(ClientPresenceManager.MANAGER_SET_KEY, missingPeerId)));
}
private void addClientPresence(final String managerId) {
final String clientPresenceKey = ClientPresenceManager.getPresenceKey(UUID.randomUUID(), 7);
REDIS_CLUSTER_EXTENSION.getRedisCluster().useCluster(connection -> {
connection.sync().set(clientPresenceKey, managerId);
connection.sync().sadd(ClientPresenceManager.getConnectedClientSetKey(managerId), clientPresenceKey);
});
}
@Test
void testClearAllOnStop() {
final int localAccounts = 10;
final UUID[] localUuids = new UUID[localAccounts];
final long[] localDeviceIds = new long[localAccounts];
for (int i = 0; i < localAccounts; i++) {
localUuids[i] = UUID.randomUUID();
localDeviceIds[i] = i;
clientPresenceManager.setPresent(localUuids[i], localDeviceIds[i], NO_OP);
}
final UUID displacedAccountUuid = UUID.randomUUID();
final long displacedAccountDeviceId = 7;
clientPresenceManager.setPresent(displacedAccountUuid, displacedAccountDeviceId, NO_OP);
REDIS_CLUSTER_EXTENSION.getRedisCluster().useCluster(connection -> connection.sync()
.set(ClientPresenceManager.getPresenceKey(displacedAccountUuid, displacedAccountDeviceId),
UUID.randomUUID().toString()));
clientPresenceManager.stop();
for (int i = 0; i < localAccounts; i++) {
localUuids[i] = UUID.randomUUID();
localDeviceIds[i] = i;
assertFalse(clientPresenceManager.isPresent(localUuids[i], localDeviceIds[i]));
}
assertTrue(clientPresenceManager.isPresent(displacedAccountUuid, displacedAccountDeviceId));
}
@Nested
class MultiServerTest {
private ClientPresenceManager server1;
private ClientPresenceManager server2;
@BeforeEach
void setup() throws Exception {
REDIS_CLUSTER_EXTENSION.getRedisCluster().useCluster(connection -> {
connection.sync().flushall();
connection.sync().upstream().commands().configSet("notify-keyspace-events", "K$glz");
});
final ScheduledExecutorService scheduledExecutorService1 = mock(ScheduledExecutorService.class);
final ExecutorService keyspaceNotificationExecutorService1 = Executors.newSingleThreadExecutor();
server1 = new ClientPresenceManager(REDIS_CLUSTER_EXTENSION.getRedisCluster(),
scheduledExecutorService1, keyspaceNotificationExecutorService1);
final ScheduledExecutorService scheduledExecutorService2 = mock(ScheduledExecutorService.class);
final ExecutorService keyspaceNotificationExecutorService2 = Executors.newSingleThreadExecutor();
server2 = new ClientPresenceManager(REDIS_CLUSTER_EXTENSION.getRedisCluster(),
scheduledExecutorService2, keyspaceNotificationExecutorService2);
server1.start();
server2.start();
}
@AfterEach
void teardown() {
server2.stop();
server1.stop();
}
@Test
public void testIsPresent() {
final UUID accountUuid = UUID.randomUUID();
final long deviceId = 1;
void testSetPresentRemotely() {
final UUID uuid1 = UUID.randomUUID();
final long deviceId = 1L;
assertFalse(clientPresenceManager.isPresent(accountUuid, deviceId));
final CompletableFuture<?> displaced = new CompletableFuture<>();
final DisplacedPresenceListener listener1 = () -> displaced.complete(null);
server1.setPresent(uuid1, deviceId, listener1);
clientPresenceManager.setPresent(accountUuid, deviceId, NO_OP);
assertTrue(clientPresenceManager.isPresent(accountUuid, deviceId));
server2.setPresent(uuid1, deviceId, () -> {
});
assertTimeoutPreemptively(Duration.ofSeconds(10), displaced::join);
}
@Test
public void testIsLocallyPresent() {
final UUID accountUuid = UUID.randomUUID();
final long deviceId = 1;
void testDisconnectPresenceLocally() {
final UUID uuid1 = UUID.randomUUID();
final long deviceId = 1L;
assertFalse(clientPresenceManager.isLocallyPresent(accountUuid, deviceId));
final CompletableFuture<?> displaced = new CompletableFuture<>();
final DisplacedPresenceListener listener1 = () -> displaced.complete(null);
server1.setPresent(uuid1, deviceId, listener1);
clientPresenceManager.setPresent(accountUuid, deviceId, NO_OP);
getRedisCluster().useCluster(connection -> connection.sync().flushall());
server1.disconnectPresence(uuid1, deviceId);
assertTrue(clientPresenceManager.isLocallyPresent(accountUuid, deviceId));
assertTimeoutPreemptively(Duration.ofSeconds(10), displaced::join);
}
@Test
public void testLocalDisplacement() {
final UUID accountUuid = UUID.randomUUID();
final long deviceId = 1;
void testDisconnectPresenceRemotely() {
final UUID uuid1 = UUID.randomUUID();
final long deviceId = 1L;
final AtomicInteger displacementCounter = new AtomicInteger(0);
final DisplacedPresenceListener displacementListener = displacementCounter::incrementAndGet;
final CompletableFuture<?> displaced = new CompletableFuture<>();
final DisplacedPresenceListener listener1 = () -> displaced.complete(null);
server1.setPresent(uuid1, deviceId, listener1);
clientPresenceManager.setPresent(accountUuid, deviceId, displacementListener);
server2.disconnectPresence(uuid1, deviceId);
assertEquals(0, displacementCounter.get());
clientPresenceManager.setPresent(accountUuid, deviceId, displacementListener);
assertEquals(1, displacementCounter.get());
}
@Test(timeout = 10_000)
public void testRemoteDisplacement() throws InterruptedException {
final UUID accountUuid = UUID.randomUUID();
final long deviceId = 1;
final AtomicBoolean displaced = new AtomicBoolean(false);
clientPresenceManager.start();
try {
clientPresenceManager.setPresent(accountUuid, deviceId, () -> {
synchronized (displaced) {
displaced.set(true);
displaced.notifyAll();
}
});
getRedisCluster().useCluster(connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(accountUuid, deviceId),
UUID.randomUUID().toString()));
synchronized (displaced) {
while (!displaced.get()) {
displaced.wait();
}
}
} finally {
clientPresenceManager.stop();
}
}
@Test(timeout = 10_000)
public void testRemoteDisplacementAfterTopologyChange() throws InterruptedException {
final UUID accountUuid = UUID.randomUUID();
final long deviceId = 1;
final AtomicBoolean displaced = new AtomicBoolean(false);
clientPresenceManager.start();
try {
clientPresenceManager.setPresent(accountUuid, deviceId, () -> {
synchronized (displaced) {
displaced.set(true);
displaced.notifyAll();
}
});
clientPresenceManager.getPubSubConnection().usePubSubConnection(connection -> connection.getResources().eventBus().publish(new ClusterTopologyChangedEvent(List.of(), List.of())));
getRedisCluster().useCluster(connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(accountUuid, deviceId),
UUID.randomUUID().toString()));
synchronized (displaced) {
while (!displaced.get()) {
displaced.wait();
}
}
} finally {
clientPresenceManager.stop();
}
}
@Test
public void testClearPresence() {
final UUID accountUuid = UUID.randomUUID();
final long deviceId = 1;
assertFalse(clientPresenceManager.isPresent(accountUuid, deviceId));
clientPresenceManager.setPresent(accountUuid, deviceId, NO_OP);
assertTrue(clientPresenceManager.clearPresence(accountUuid, deviceId));
clientPresenceManager.setPresent(accountUuid, deviceId, NO_OP);
getRedisCluster().useCluster(connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(accountUuid, deviceId),
UUID.randomUUID().toString()));
assertFalse(clientPresenceManager.clearPresence(accountUuid, deviceId));
}
@Test
public void testPruneMissingPeers() {
final String presentPeerId = UUID.randomUUID().toString();
final String missingPeerId = UUID.randomUUID().toString();
getRedisCluster().useCluster(connection -> {
connection.sync().sadd(ClientPresenceManager.MANAGER_SET_KEY, presentPeerId);
connection.sync().sadd(ClientPresenceManager.MANAGER_SET_KEY, missingPeerId);
});
for (int i = 0; i < 10; i++) {
addClientPresence(presentPeerId);
addClientPresence(missingPeerId);
}
clientPresenceManager.getPubSubConnection().usePubSubConnection(connection -> connection.sync().upstream().commands().subscribe(ClientPresenceManager.getManagerPresenceChannel(presentPeerId)));
clientPresenceManager.pruneMissingPeers();
assertEquals(1, (long)getRedisCluster().withCluster(connection -> connection.sync().exists(ClientPresenceManager.getConnectedClientSetKey(presentPeerId))));
assertTrue(getRedisCluster().withCluster(connection -> connection.sync().sismember(ClientPresenceManager.MANAGER_SET_KEY, presentPeerId)));
assertEquals(0, (long)getRedisCluster().withCluster(connection -> connection.sync().exists(ClientPresenceManager.getConnectedClientSetKey(missingPeerId))));
assertFalse(getRedisCluster().withCluster(connection -> connection.sync().sismember(ClientPresenceManager.MANAGER_SET_KEY, missingPeerId)));
}
private void addClientPresence(final String managerId) {
final String clientPresenceKey = ClientPresenceManager.getPresenceKey(UUID.randomUUID(), 7);
getRedisCluster().useCluster(connection -> {
connection.sync().set(clientPresenceKey, managerId);
connection.sync().sadd(ClientPresenceManager.getConnectedClientSetKey(managerId), clientPresenceKey);
});
}
@Test
public void testClearAllOnStop() {
final int localAccounts = 10;
final UUID[] localUuids = new UUID[localAccounts];
final long[] localDeviceIds = new long[localAccounts];
for (int i = 0; i < localAccounts; i++) {
localUuids[i] = UUID.randomUUID();
localDeviceIds[i] = i;
clientPresenceManager.setPresent(localUuids[i], localDeviceIds[i], NO_OP);
}
final UUID displacedAccountUuid = UUID.randomUUID();
final long displacedAccountDeviceId = 7;
clientPresenceManager.setPresent(displacedAccountUuid, displacedAccountDeviceId, NO_OP);
getRedisCluster().useCluster(connection -> connection.sync().set(ClientPresenceManager.getPresenceKey(displacedAccountUuid, displacedAccountDeviceId),
UUID.randomUUID().toString()));
clientPresenceManager.stop();
for (int i = 0; i < localAccounts; i++) {
localUuids[i] = UUID.randomUUID();
localDeviceIds[i] = i;
assertFalse(clientPresenceManager.isPresent(localUuids[i], localDeviceIds[i]));
}
assertTrue(clientPresenceManager.isPresent(displacedAccountUuid, displacedAccountDeviceId));
assertTimeoutPreemptively(Duration.ofSeconds(10), displaced::join);
}
}
}