replicate uuids to contact discovery

This commit is contained in:
Jeffrey Griffin
2019-06-29 08:26:18 -07:00
committed by Moxie Marlinspike
parent 7a3a385569
commit 07822b371f
12 changed files with 59 additions and 38 deletions

View File

@@ -357,7 +357,7 @@ public class AccountController {
accounts.update(account);
if (!wasAccountEnabled && account.isEnabled()) {
directoryQueue.addRegisteredUser(account.getNumber());
directoryQueue.addRegisteredUser(account.getUuid(), account.getNumber());
}
}
@@ -373,7 +373,7 @@ public class AccountController {
accounts.update(account);
if (!account.isEnabled()) {
directoryQueue.deleteRegisteredUser(account.getNumber());
directoryQueue.deleteRegisteredUser(account.getUuid(), account.getNumber());
}
}
@@ -393,7 +393,7 @@ public class AccountController {
accounts.update(account);
if (!wasAccountEnabled && account.isEnabled()) {
directoryQueue.addRegisteredUser(account.getNumber());
directoryQueue.addRegisteredUser(account.getUuid(), account.getNumber());
}
}
@@ -409,7 +409,7 @@ public class AccountController {
accounts.update(account);
if (!account.isEnabled()) {
directoryQueue.deleteRegisteredUser(account.getNumber());
directoryQueue.deleteRegisteredUser(account.getUuid(), account.getNumber());
}
}
@@ -617,9 +617,9 @@ public class AccountController {
}
if (account.isEnabled()) {
directoryQueue.addRegisteredUser(number);
directoryQueue.addRegisteredUser(account.getUuid(), number);
} else {
directoryQueue.deleteRegisteredUser(number);
directoryQueue.deleteRegisteredUser(account.getUuid(), number);
}
messagesManager.clear(number);

View File

@@ -113,7 +113,7 @@ public class DeviceController {
accounts.update(account);
if (!account.isEnabled()) {
directoryQueue.deleteRegisteredUser(account.getNumber());
directoryQueue.deleteRegisteredUser(account.getUuid(), account.getNumber());
}
messages.clear(account.getNumber(), deviceId);

View File

@@ -107,7 +107,7 @@ public class KeysController {
accounts.update(account);
if (!wasAccountEnabled && account.isEnabled()) {
directoryQueue.addRegisteredUser(account.getNumber());
directoryQueue.addRegisteredUser(account.getUuid(), account.getNumber());
}
}
@@ -173,7 +173,7 @@ public class KeysController {
accounts.update(account);
if (!wasAccountEnabled && account.isEnabled()) {
directoryQueue.addRegisteredUser(account.getNumber());
directoryQueue.addRegisteredUser(account.getUuid(), account.getNumber());
}
}

View File

@@ -29,15 +29,19 @@ public class DirectoryReconciliationRequest {
@JsonProperty
private UUID toUuid;
@JsonProperty
private List<UUID> uuids;
@JsonProperty
private List<String> numbers;
public DirectoryReconciliationRequest() {
}
public DirectoryReconciliationRequest(UUID fromUuid, UUID toUuid, List<String> numbers) {
public DirectoryReconciliationRequest(UUID fromUuid, UUID toUuid, List<UUID> uuids, List<String> numbers) {
this.fromUuid = fromUuid;
this.toUuid = toUuid;
this.uuids = uuids;
this.numbers = numbers;
}
@@ -49,6 +53,10 @@ public class DirectoryReconciliationRequest {
return toUuid;
}
public List<UUID> getUuids() {
return uuids;
}
public List<String> getNumbers() {
return numbers;
}

View File

@@ -35,6 +35,7 @@ import org.whispersystems.textsecuregcm.util.Constants;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import static com.codahale.metrics.MetricRegistry.name;
@@ -52,28 +53,29 @@ public class DirectoryQueue {
public DirectoryQueue(SqsConfiguration sqsConfig) {
final AWSCredentials credentials = new BasicAWSCredentials(sqsConfig.getAccessKey(), sqsConfig.getAccessSecret());
final AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(credentials);
this.queueUrl = sqsConfig.getQueueUrl();
this.sqs = AmazonSQSClientBuilder.standard().withRegion(sqsConfig.getRegion()).withCredentials(credentialsProvider).build();
}
public void addRegisteredUser(String user) {
sendMessage("add", user);
public void addRegisteredUser(UUID uuid, String number) {
sendMessage("add", uuid, number);
}
public void deleteRegisteredUser(String user) {
sendMessage("delete", user);
public void deleteRegisteredUser(UUID uuid, String number) {
sendMessage("delete", uuid, number);
}
private void sendMessage(String action, String user) {
private void sendMessage(String action, UUID uuid, String number) {
final Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
messageAttributes.put("id", new MessageAttributeValue().withDataType("String").withStringValue(user));
messageAttributes.put("id", new MessageAttributeValue().withDataType("String").withStringValue(number));
messageAttributes.put("uuid", new MessageAttributeValue().withDataType("String").withStringValue(uuid.toString()));
messageAttributes.put("action", new MessageAttributeValue().withDataType("String").withStringValue(action));
SendMessageRequest sendMessageRequest = new SendMessageRequest()
.withQueueUrl(queueUrl)
.withMessageBody("-")
.withMessageDeduplicationId(user + action)
.withMessageGroupId(user)
.withMessageDeduplicationId(UUID.randomUUID().toString())
.withMessageGroupId(number)
.withMessageAttributes(messageAttributes);
try {
sqs.sendMessage(sendMessageRequest);

View File

@@ -68,7 +68,7 @@ public class AccountCleaner implements AccountDatabaseCrawlerListener {
accountUpdateCount++;
accountsManager.update(account);
directoryQueue.deleteRegisteredUser(account.getNumber());
directoryQueue.deleteRegisteredUser(account.getUuid(), account.getNumber());
}
}
}

View File

@@ -30,6 +30,8 @@ import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Util;
import javax.ws.rs.ProcessingException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
@@ -57,7 +59,7 @@ public class DirectoryReconciler implements AccountDatabaseCrawlerListener {
public void onCrawlStart() { }
public void onCrawlEnd(Optional<UUID> fromUuid) {
DirectoryReconciliationRequest request = new DirectoryReconciliationRequest(fromUuid.orElse(null), null, Collections.emptyList());
DirectoryReconciliationRequest request = new DirectoryReconciliationRequest(fromUuid.orElse(null), null, Collections.emptyList(), Collections.emptyList());
DirectoryReconciliationResponse response = sendChunk(request);
}
@@ -93,10 +95,14 @@ public class DirectoryReconciler implements AccountDatabaseCrawlerListener {
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private DirectoryReconciliationRequest createChunkRequest(Optional<UUID> fromUuid, List<Account> accounts) {
List<String> numbers = accounts.stream()
.filter(Account::isEnabled)
.map(Account::getNumber)
.collect(Collectors.toList());
List<UUID> uuids = new ArrayList<>(accounts.size());
List<String> numbers = new ArrayList<>(accounts.size());
for (Account account : accounts) {
if (account.isEnabled()) {
uuids.add(account.getUuid());
numbers.add(account.getNumber());
}
}
Optional<UUID> toUuid = Optional.empty();
@@ -104,7 +110,7 @@ public class DirectoryReconciler implements AccountDatabaseCrawlerListener {
toUuid = Optional.of(accounts.get(accounts.size() - 1).getUuid());
}
return new DirectoryReconciliationRequest(fromUuid.orElse(null), toUuid.orElse(null), numbers);
return new DirectoryReconciliationRequest(fromUuid.orElse(null), toUuid.orElse(null), uuids, numbers);
}
private DirectoryReconciliationResponse sendChunk(DirectoryReconciliationRequest request) {

View File

@@ -59,7 +59,7 @@ public class PushFeedbackProcessor implements AccountDatabaseCrawlerListener {
accountsManager.update(account);
if (!account.isEnabled()) {
directoryQueue.deleteRegisteredUser(account.getNumber());
directoryQueue.deleteRegisteredUser(account.getUuid(), account.getNumber());
}
}
}

View File

@@ -88,7 +88,7 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
device.get().setAuthenticationCredentials(new AuthenticationCredentials(Base64.encodeBytes(random)));
accountsManager.update(account.get());
directoryQueue.deleteRegisteredUser(account.get().getNumber());
directoryQueue.deleteRegisteredUser(account.get().getUuid(), account.get().getNumber());
logger.warn("Removed " + account.get().getNumber());
} else {