Reconcile inactive and undiscoverable accounts when using v3 endpoints

This commit is contained in:
Chris Eager
2021-07-28 11:27:23 -05:00
committed by Chris Eager
parent 331ff83cd5
commit df9c0051c9
7 changed files with 173 additions and 121 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.sqs;
@@ -95,12 +95,8 @@ public class DirectoryQueue implements Managed {
sqs.close();
}
public boolean isDiscoverable(final Account account) {
return account.isEnabled() && account.isDiscoverableByPhoneNumber();
}
public void refreshAccount(final Account account) {
sendUpdateMessage(account, isDiscoverable(account) ? UpdateAction.ADD : UpdateAction.DELETE);
sendUpdateMessage(account, account.shouldBeVisibleInDirectory() ? UpdateAction.ADD : UpdateAction.DELETE);
}
public void deleteAccount(final Account account) {

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
@@ -327,7 +327,7 @@ public class Account implements Principal {
return new StoredRegistrationLock(Optional.ofNullable(registrationLock), Optional.ofNullable(registrationLockSalt), getLastSeen());
}
public Optional<byte[]> getUnidentifiedAccessKey() {
requireNotStale();
@@ -372,6 +372,12 @@ public class Account implements Principal {
this.discoverableByPhoneNumber = discoverableByPhoneNumber;
}
public boolean shouldBeVisibleInDirectory() {
requireNotStale();
return isEnabled() && isDiscoverableByPhoneNumber();
}
public int getVersion() {
requireNotStale();

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
@@ -250,7 +250,7 @@ public class AccountsManager {
public Account update(Account account, Consumer<Account> updater) {
final boolean wasDiscoverableBeforeUpdate = directoryQueue.isDiscoverable(account);
final boolean wasVisibleBeforeUpdate = account.shouldBeVisibleInDirectory();
final Account updatedAccount;
@@ -293,9 +293,9 @@ public class AccountsManager {
redisSet(updatedAccount);
}
final boolean isDiscoverableAfterUpdate = directoryQueue.isDiscoverable(updatedAccount);
final boolean isVisibleAfterUpdate = updatedAccount.shouldBeVisibleInDirectory();
if (wasDiscoverableBeforeUpdate != isDiscoverableAfterUpdate) {
if (wasVisibleBeforeUpdate != isVisibleAfterUpdate) {
directoryQueue.refreshAccount(updatedAccount);
}

View File

@@ -1,105 +1,134 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import static com.codahale.metrics.MetricRegistry.name;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import io.micrometer.core.instrument.Metrics;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Function;
import javax.ws.rs.ProcessingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationRequest;
import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationResponse;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationResponse.Status;
public class DirectoryReconciler extends AccountDatabaseCrawlerListener {
private static final Logger logger = LoggerFactory.getLogger(DirectoryReconciler.class);
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private static final String SEND_TIMER_NAME = name(DirectoryReconciler.class, "sendRequest");
private final String replicationName;
private final DirectoryReconciliationClient reconciliationClient;
private final Timer sendChunkTimer;
private final Meter sendChunkErrorMeter;
private boolean useV3Endpoints;
public DirectoryReconciler(String name, DirectoryReconciliationClient reconciliationClient) {
public DirectoryReconciler(String replicationName, DirectoryReconciliationClient reconciliationClient) {
this.reconciliationClient = reconciliationClient;
sendChunkTimer = metricRegistry.timer(name(DirectoryReconciler.class, name, "sendChunk"));
sendChunkErrorMeter = metricRegistry.meter(name(DirectoryReconciler.class, name, "sendChunkError"));
this.replicationName = replicationName;
}
@Override
public void onCrawlStart() { }
public void onCrawlStart() {
}
@Override
public void onCrawlEnd(Optional<UUID> fromUuid) {
DirectoryReconciliationRequest request = new DirectoryReconciliationRequest(fromUuid.orElse(null), null, Collections.emptyList());
sendChunk(request);
if (useV3Endpoints) {
reconciliationClient.complete();
} else {
final DirectoryReconciliationRequest request = new DirectoryReconciliationRequest(fromUuid.orElse(null), null,
Collections.emptyList());
sendAdditions(request);
}
}
@Override
protected void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) throws AccountDatabaseCrawlerRestartException {
DirectoryReconciliationRequest request = createChunkRequest(fromUuid, chunkAccounts);
DirectoryReconciliationResponse response = sendChunk(request);
if (response.getStatus() == DirectoryReconciliationResponse.Status.MISSING) {
protected void onCrawlChunk(final Optional<UUID> fromUuid, final List<Account> accounts)
throws AccountDatabaseCrawlerRestartException {
final DirectoryReconciliationRequest addUsersRequest;
final DirectoryReconciliationRequest deleteUsersRequest;
{
final List<DirectoryReconciliationRequest.User> addedUsers = new ArrayList<>(accounts.size());
final List<DirectoryReconciliationRequest.User> deletedUsers = new ArrayList<>(accounts.size());
accounts.forEach(account -> {
if (account.shouldBeVisibleInDirectory()) {
addedUsers.add(new DirectoryReconciliationRequest.User(account.getUuid(), account.getNumber()));
} else {
deletedUsers.add(new DirectoryReconciliationRequest.User(account.getUuid(), account.getNumber()));
}
});
final Optional<UUID> toUuid;
if (!accounts.isEmpty()) {
toUuid = Optional.of(accounts.get(accounts.size() - 1).getUuid());
} else {
toUuid = Optional.empty();
}
addUsersRequest = new DirectoryReconciliationRequest(fromUuid.orElse(null), toUuid.orElse(null), addedUsers);
deleteUsersRequest = new DirectoryReconciliationRequest(null, null, deletedUsers);
}
final DirectoryReconciliationResponse addUsersResponse = sendAdditions(addUsersRequest);
final DirectoryReconciliationResponse deleteUsersResponse = sendDeletes(deleteUsersRequest);
if (addUsersResponse.getStatus() == DirectoryReconciliationResponse.Status.MISSING
|| deleteUsersResponse.getStatus() == Status.MISSING) {
throw new AccountDatabaseCrawlerRestartException("directory reconciler missing");
}
}
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private DirectoryReconciliationRequest createChunkRequest(Optional<UUID> fromUuid, List<Account> accounts) {
List<DirectoryReconciliationRequest.User> users = new ArrayList<>(accounts.size());
for (Account account : accounts) {
if (account.isEnabled() && account.isDiscoverableByPhoneNumber()) {
users.add(new DirectoryReconciliationRequest.User(account.getUuid(), account.getNumber()));
}
private DirectoryReconciliationResponse sendDeletes(final DirectoryReconciliationRequest request) {
if (useV3Endpoints) {
return sendRequest(request, reconciliationClient::delete, "delete");
}
Optional<UUID> toUuid = Optional.empty();
if (!accounts.isEmpty()) {
toUuid = Optional.of(accounts.get(accounts.size() - 1).getUuid());
}
return new DirectoryReconciliationRequest(fromUuid.orElse(null), toUuid.orElse(null), users);
return new DirectoryReconciliationResponse(DirectoryReconciliationResponse.Status.OK);
}
private DirectoryReconciliationResponse sendChunk(DirectoryReconciliationRequest request) {
try (Timer.Context timer = sendChunkTimer.time()) {
DirectoryReconciliationResponse response;
if (useV3Endpoints) {
response = reconciliationClient.sendChunkV3(request);
} else {
response = reconciliationClient.sendChunk(request);
}
if (response.getStatus() != DirectoryReconciliationResponse.Status.OK) {
sendChunkErrorMeter.mark();
logger.warn("reconciliation error: " + response.getStatus());
}
return response;
} catch (ProcessingException ex) {
sendChunkErrorMeter.mark();
logger.warn("request error: ", ex);
throw new ProcessingException(ex);
private DirectoryReconciliationResponse sendAdditions(final DirectoryReconciliationRequest request) {
if (useV3Endpoints) {
return sendRequest(request, reconciliationClient::sendChunkV3, "add");
}
return sendRequest(request, reconciliationClient::sendChunk, "add_v2");
}
private DirectoryReconciliationResponse sendRequest(final DirectoryReconciliationRequest request,
final Function<DirectoryReconciliationRequest, DirectoryReconciliationResponse> requestHandler,
final String context) {
return Metrics.timer(SEND_TIMER_NAME, "context", context, "replication", replicationName)
.record(() -> {
try {
final DirectoryReconciliationResponse response = requestHandler.apply(request);
if (response.getStatus() != DirectoryReconciliationResponse.Status.OK) {
logger.warn("reconciliation error: " + response.getStatus());
}
return response;
} catch (ProcessingException ex) {
logger.warn("request error: ", ex);
throw new ProcessingException(ex);
}
});
}
public void setUseV3Endpoints(final boolean useV3Endpoints) {
this.useV3Endpoints = useV3Endpoints;
}
}