mirror of
https://github.com/signalapp/Signal-Server
synced 2026-04-25 01:58:06 +01:00
Remove redundant disconnection requests
This commit is contained in:
@@ -25,14 +25,10 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.auth.DisconnectionRequestManager;
|
||||
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
|
||||
import org.whispersystems.textsecuregcm.entities.MessageProtos;
|
||||
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
|
||||
import org.whispersystems.textsecuregcm.identity.IdentityType;
|
||||
import org.whispersystems.textsecuregcm.metrics.DevicePlatformUtil;
|
||||
import org.whispersystems.textsecuregcm.push.MessageSender;
|
||||
import org.whispersystems.textsecuregcm.push.PushNotificationManager;
|
||||
import org.whispersystems.textsecuregcm.util.Util;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
@@ -45,8 +41,6 @@ public class MessagePersister implements Managed {
|
||||
private final MessagesManager messagesManager;
|
||||
private final AccountsManager accountsManager;
|
||||
private final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager;
|
||||
private final ExperimentEnrollmentManager experimentEnrollmentManager;
|
||||
private final DisconnectionRequestManager disconnectionRequestManager;
|
||||
|
||||
private final Duration persistDelay;
|
||||
|
||||
@@ -84,8 +78,6 @@ public class MessagePersister implements Managed {
|
||||
final MessagesManager messagesManager,
|
||||
final AccountsManager accountsManager,
|
||||
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager,
|
||||
final ExperimentEnrollmentManager experimentEnrollmentManager,
|
||||
final DisconnectionRequestManager disconnectionRequestManager,
|
||||
final Duration persistDelay,
|
||||
final int dedicatedProcessWorkerThreadCount) {
|
||||
|
||||
@@ -93,8 +85,6 @@ public class MessagePersister implements Managed {
|
||||
this.messagesManager = messagesManager;
|
||||
this.accountsManager = accountsManager;
|
||||
this.dynamicConfigurationManager = dynamicConfigurationManager;
|
||||
this.experimentEnrollmentManager = experimentEnrollmentManager;
|
||||
this.disconnectionRequestManager = disconnectionRequestManager;
|
||||
this.persistDelay = persistDelay;
|
||||
this.workerThreads = new Thread[dedicatedProcessWorkerThreadCount];
|
||||
|
||||
@@ -260,9 +250,7 @@ public class MessagePersister implements Managed {
|
||||
throw new MessagePersistenceException("Could not persist due to an overfull queue. Trimmed primary queue, a subsequent retry may succeed");
|
||||
} else {
|
||||
logger.warn("Failed to persist queue {}::{} due to overfull queue; will unlink device", accountUuid, deviceId);
|
||||
accountsManager.removeDevice(account, deviceId)
|
||||
.thenRun(() -> disconnectionRequestManager.requestDisconnection(accountUuid))
|
||||
.join();
|
||||
accountsManager.removeDevice(account, deviceId).join();
|
||||
}
|
||||
} finally {
|
||||
messagesCache.unlockQueueForPersistence(accountUuid, deviceId);
|
||||
|
||||
@@ -87,7 +87,6 @@ record CommandDependencies(
|
||||
AccountsManager accountsManager,
|
||||
ProfilesManager profilesManager,
|
||||
ReportMessageManager reportMessageManager,
|
||||
DisconnectionRequestManager disconnectionRequestManager,
|
||||
MessagesCache messagesCache,
|
||||
MessagesManager messagesManager,
|
||||
KeysManager keysManager,
|
||||
@@ -333,7 +332,6 @@ record CommandDependencies(
|
||||
accountsManager,
|
||||
profilesManager,
|
||||
reportMessageManager,
|
||||
disconnectionRequestManager,
|
||||
messagesCache,
|
||||
messagesManager,
|
||||
keys,
|
||||
|
||||
@@ -65,8 +65,6 @@ public class MessagePersisterServiceCommand extends ServerCommand<WhisperServerC
|
||||
deps.messagesManager(),
|
||||
deps.accountsManager(),
|
||||
deps.dynamicConfigurationManager(),
|
||||
new ExperimentEnrollmentManager(deps.dynamicConfigurationManager()),
|
||||
deps.disconnectionRequestManager(),
|
||||
Duration.ofMinutes(configuration.getMessageCacheConfiguration().getPersistDelayMinutes()),
|
||||
namespace.getInt(WORKER_COUNT));
|
||||
|
||||
|
||||
@@ -110,10 +110,7 @@ public class RemoveExpiredLinkedDevicesCommand extends AbstractSinglePassCrawlAc
|
||||
|
||||
final Mono<Long> accountUpdate = dryRun
|
||||
? Mono.just((long) expiredDevices.size())
|
||||
: deleteDevices(account, expiredDevices, maxRetries)
|
||||
.flatMap(count ->
|
||||
Mono.fromCompletionStage(getCommandDependencies().disconnectionRequestManager().requestDisconnection(account.getUuid()))
|
||||
.then(Mono.just(count)));
|
||||
: deleteDevices(account, expiredDevices, maxRetries);
|
||||
|
||||
return accountUpdate
|
||||
.doOnNext(successCounter::increment)
|
||||
|
||||
@@ -99,8 +99,7 @@ class MessagePersisterIntegrationTest {
|
||||
webSocketConnectionEventManager.start();
|
||||
|
||||
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager,
|
||||
dynamicConfigurationManager, mock(ExperimentEnrollmentManager.class), mock(DisconnectionRequestManager.class),
|
||||
PERSIST_DELAY, 1);
|
||||
dynamicConfigurationManager, PERSIST_DELAY, 1);
|
||||
|
||||
account = mock(Account.class);
|
||||
|
||||
|
||||
@@ -20,7 +20,6 @@ import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.whispersystems.textsecuregcm.util.MockUtils.exactly;
|
||||
|
||||
@@ -51,11 +50,9 @@ import org.junit.jupiter.api.Timeout;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.stubbing.Answer;
|
||||
import org.whispersystems.textsecuregcm.auth.DisconnectionRequestManager;
|
||||
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicMessagePersisterConfiguration;
|
||||
import org.whispersystems.textsecuregcm.entities.MessageProtos;
|
||||
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
|
||||
import org.whispersystems.textsecuregcm.identity.IdentityType;
|
||||
import org.whispersystems.textsecuregcm.redis.RedisClusterExtension;
|
||||
import org.whispersystems.textsecuregcm.tests.util.DevicesHelper;
|
||||
@@ -78,7 +75,6 @@ class MessagePersisterTest {
|
||||
private MessagePersister messagePersister;
|
||||
private AccountsManager accountsManager;
|
||||
private MessagesManager messagesManager;
|
||||
private DisconnectionRequestManager disconnectionRequestManager;
|
||||
private Account destinationAccount;
|
||||
|
||||
private static final UUID DESTINATION_ACCOUNT_UUID = UUID.randomUUID();
|
||||
@@ -99,7 +95,6 @@ class MessagePersisterTest {
|
||||
|
||||
messagesDynamoDb = mock(MessagesDynamoDb.class);
|
||||
accountsManager = mock(AccountsManager.class);
|
||||
disconnectionRequestManager = mock(DisconnectionRequestManager.class);
|
||||
destinationAccount = mock(Account.class);
|
||||
|
||||
when(accountsManager.getByAccountIdentifier(DESTINATION_ACCOUNT_UUID)).thenReturn(Optional.of(destinationAccount));
|
||||
@@ -122,8 +117,7 @@ class MessagePersisterTest {
|
||||
messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(),
|
||||
messageDeliveryScheduler, sharedExecutorService, Clock.systemUTC());
|
||||
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager,
|
||||
dynamicConfigurationManager, mock(ExperimentEnrollmentManager.class), disconnectionRequestManager,
|
||||
PERSIST_DELAY, 1);
|
||||
dynamicConfigurationManager, PERSIST_DELAY, 1);
|
||||
|
||||
when(messagesManager.clear(any(UUID.class), anyByte())).thenReturn(CompletableFuture.completedFuture(null));
|
||||
|
||||
@@ -305,7 +299,6 @@ class MessagePersisterTest {
|
||||
assertTimeoutPreemptively(Duration.ofSeconds(1), () ->
|
||||
messagePersister.persistQueue(destinationAccount, DESTINATION_DEVICE, "test"));
|
||||
verify(accountsManager, exactly()).removeDevice(destinationAccount, DESTINATION_DEVICE_ID);
|
||||
verify(disconnectionRequestManager, exactly()).requestDisconnection(DESTINATION_ACCOUNT_UUID);
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -407,7 +400,6 @@ class MessagePersisterTest {
|
||||
when(accountsManager.removeDevice(destinationAccount, DESTINATION_DEVICE_ID)).thenReturn(CompletableFuture.failedFuture(new TimeoutException()));
|
||||
|
||||
assertThrows(CompletionException.class, () -> messagePersister.persistQueue(destinationAccount, DESTINATION_DEVICE, "test"));
|
||||
verifyNoMoreInteractions(disconnectionRequestManager);
|
||||
}
|
||||
|
||||
@SuppressWarnings("SameParameterValue")
|
||||
|
||||
@@ -74,7 +74,6 @@ class FinishPushNotificationExperimentCommandTest {
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
pushNotificationExperimentSamples,
|
||||
null,
|
||||
null,
|
||||
|
||||
@@ -50,7 +50,6 @@ class NotifyIdleDevicesCommandTest {
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
messagesManager,
|
||||
null,
|
||||
null,
|
||||
|
||||
@@ -18,7 +18,6 @@ import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.Accounts;
|
||||
import org.whispersystems.textsecuregcm.storage.DynamoDbRecoveryManager;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
@@ -51,7 +50,6 @@ class RegenerateSecondaryDynamoDbTableDataCommandTest {
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
dynamoDbRecoveryManager);
|
||||
|
||||
namespace = new Namespace(Map.of(
|
||||
|
||||
@@ -63,7 +63,6 @@ class StartPushNotificationExperimentCommandTest {
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
pushNotificationExperimentSamples,
|
||||
null,
|
||||
null,
|
||||
|
||||
Reference in New Issue
Block a user