revert to phone number-based account crawler

This commit is contained in:
Jeffrey Griffin
2019-08-26 14:00:15 -07:00
parent 284428a45a
commit cf78047830
16 changed files with 118 additions and 160 deletions

View File

@@ -19,11 +19,10 @@ package org.whispersystems.textsecuregcm.entities;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Map;
import java.util.HashMap;
import java.util.UUID;
public class ActiveUserTally {
@JsonProperty
private UUID fromUuid;
private String fromNumber;
@JsonProperty
private Map<String, long[]> platforms;
@@ -33,14 +32,14 @@ public class ActiveUserTally {
public ActiveUserTally() {}
public ActiveUserTally(UUID fromUuid, Map<String, long[]> platforms, Map<String, long[]> countries) {
this.fromUuid = fromUuid;
public ActiveUserTally(String fromNumber, Map<String, long[]> platforms, Map<String, long[]> countries) {
this.fromNumber = fromNumber;
this.platforms = platforms;
this.countries = countries;
}
public UUID getFromUuid() {
return this.fromUuid;
public String getFromNumber() {
return this.fromNumber;
}
public Map<String, long[]> getPlatforms() {
@@ -51,8 +50,8 @@ public class ActiveUserTally {
return this.countries;
}
public void setFromUuid(UUID fromUuid) {
this.fromUuid = fromUuid;
public void setFromNumber(String fromNumber) {
this.fromNumber = fromNumber;
}
}

View File

@@ -19,18 +19,14 @@ package org.whispersystems.textsecuregcm.entities;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.UUID;
public class DirectoryReconciliationRequest {
@JsonProperty
private UUID fromUuid;
private String fromNumber;
@JsonProperty
private UUID toUuid;
@JsonProperty
private List<UUID> uuids;
private String toNumber;
@JsonProperty
private List<String> numbers;
@@ -38,23 +34,18 @@ public class DirectoryReconciliationRequest {
public DirectoryReconciliationRequest() {
}
public DirectoryReconciliationRequest(UUID fromUuid, UUID toUuid, List<UUID> uuids, List<String> numbers) {
this.fromUuid = fromUuid;
this.toUuid = toUuid;
this.uuids = uuids;
this.numbers = numbers;
public DirectoryReconciliationRequest(String fromNumber, String toNumber, List<String> numbers) {
this.fromNumber = fromNumber;
this.toNumber = toNumber;
this.numbers = numbers;
}
public UUID getFromUuid() {
return fromUuid;
public String getFromNumber() {
return fromNumber;
}
public UUID getToUuid() {
return toUuid;
}
public List<UUID> getUuids() {
return uuids;
public String getToNumber() {
return toNumber;
}
public List<String> getNumbers() {

View File

@@ -26,7 +26,6 @@ import org.whispersystems.textsecuregcm.util.Util;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static com.codahale.metrics.MetricRegistry.name;
@@ -52,7 +51,7 @@ public class AccountCleaner implements AccountDatabaseCrawlerListener {
}
@Override
public void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) {
public void onCrawlChunk(Optional<String> fromNumber, List<Account> chunkAccounts) {
int accountUpdateCount = 0;
for (Account account : chunkAccounts) {
if (needsExplicitRemoval(account)) {
@@ -75,7 +74,7 @@ public class AccountCleaner implements AccountDatabaseCrawlerListener {
}
@Override
public void onCrawlEnd(Optional<UUID> fromUuid) {
public void onCrawlEnd(Optional<String> fromNumber) {
}
private boolean needsExplicitRemoval(Account account) {

View File

@@ -122,26 +122,26 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
}
private void processChunk() {
Optional<UUID> fromUuid = cache.getLastUuid();
Optional<String> fromNumber = cache.getLastNumber();
if (!fromUuid.isPresent()) {
listeners.forEach(AccountDatabaseCrawlerListener::onCrawlStart);
if (!fromNumber.isPresent()) {
listeners.forEach(listener -> { listener.onCrawlStart(); });
}
List<Account> chunkAccounts = readChunk(fromUuid, chunkSize);
List<Account> chunkAccounts = readChunk(fromNumber, chunkSize);
if (chunkAccounts.isEmpty()) {
listeners.forEach(listener -> listener.onCrawlEnd(fromUuid));
cache.setLastUuid(Optional.empty());
listeners.forEach(listener -> { listener.onCrawlEnd(fromNumber); });
cache.setLastNumber(Optional.empty());
cache.clearAccelerate();
} else {
try {
for (AccountDatabaseCrawlerListener listener : listeners) {
listener.onCrawlChunk(fromUuid, chunkAccounts);
listener.onCrawlChunk(fromNumber, chunkAccounts);
}
cache.setLastUuid(Optional.of(chunkAccounts.get(chunkAccounts.size() - 1).getUuid()));
cache.setLastNumber(Optional.of(chunkAccounts.get(chunkAccounts.size() - 1).getNumber()));
} catch (AccountDatabaseCrawlerRestartException e) {
cache.setLastUuid(Optional.empty());
cache.setLastNumber(Optional.empty());
cache.clearAccelerate();
}
@@ -149,12 +149,12 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
}
private List<Account> readChunk(Optional<UUID> fromUuid, int chunkSize) {
private List<Account> readChunk(Optional<String> fromNumber, int chunkSize) {
try (Timer.Context timer = readChunkTimer.time()) {
List<Account> chunkAccounts;
if (fromUuid.isPresent()) {
chunkAccounts = accounts.getAllFrom(fromUuid.get(), chunkSize);
if (fromNumber.isPresent()) {
chunkAccounts = accounts.getAllFrom(fromNumber.get(), chunkSize);
} else {
chunkAccounts = accounts.getAllFrom(chunkSize);
}

View File

@@ -23,7 +23,6 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import redis.clients.jedis.Jedis;
@@ -31,7 +30,7 @@ import redis.clients.jedis.Jedis;
public class AccountDatabaseCrawlerCache {
private static final String ACTIVE_WORKER_KEY = "account_database_crawler_cache_active_worker";
private static final String LAST_UUID_KEY = "account_database_crawler_cache_last_uuid";
private static final String LAST_NUMBER_KEY = "account_database_crawler_cache_last_number";
private static final String ACCELERATE_KEY = "account_database_crawler_cache_accelerate";
private static final long LAST_NUMBER_TTL_MS = 86400_000L;
@@ -68,21 +67,18 @@ public class AccountDatabaseCrawlerCache {
luaScript.execute(keys, args);
}
public Optional<UUID> getLastUuid() {
public Optional<String> getLastNumber() {
try (Jedis jedis = jedisPool.getWriteResource()) {
String lastUuidString = jedis.get(LAST_UUID_KEY);
if (lastUuidString == null) return Optional.empty();
else return Optional.of(UUID.fromString(lastUuidString));
return Optional.ofNullable(jedis.get(LAST_NUMBER_KEY));
}
}
public void setLastUuid(Optional<UUID> lastUuid) {
public void setLastNumber(Optional<String> lastNumber) {
try (Jedis jedis = jedisPool.getWriteResource()) {
if (lastUuid.isPresent()) {
jedis.psetex(LAST_UUID_KEY, LAST_NUMBER_TTL_MS, lastUuid.get().toString());
if (lastNumber.isPresent()) {
jedis.psetex(LAST_NUMBER_KEY, LAST_NUMBER_TTL_MS, lastNumber.get());
} else {
jedis.del(LAST_UUID_KEY);
jedis.del(LAST_NUMBER_KEY);
}
}
}

View File

@@ -18,11 +18,10 @@ package org.whispersystems.textsecuregcm.storage;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public interface AccountDatabaseCrawlerListener {
void onCrawlStart();
void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) throws AccountDatabaseCrawlerRestartException;
void onCrawlEnd(Optional<UUID> fromUuid);
void onCrawlChunk(Optional<String> fromNumber, List<Account> chunkAccounts) throws AccountDatabaseCrawlerRestartException;
void onCrawlEnd(Optional<String> fromNumber);
}

View File

@@ -110,10 +110,10 @@ public class Accounts {
}));
}
public List<Account> getAllFrom(UUID from, int length) {
public List<Account> getAllFrom(String from, int length) {
return database.with(jdbi -> jdbi.withHandle(handle -> {
try (Timer.Context ignored = getAllFromOffsetTimer.time()) {
return handle.createQuery("SELECT * FROM accounts WHERE " + UID + " > :from ORDER BY " + UID + " LIMIT :limit")
return handle.createQuery("SELECT * FROM accounts WHERE " + NUMBER + " > :from ORDER BY " + NUMBER + " LIMIT :limit")
.bind("from", from)
.bind("limit", length)
.mapTo(Account.class)
@@ -125,7 +125,7 @@ public class Accounts {
public List<Account> getAllFrom(int length) {
return database.with(jdbi -> jdbi.withHandle(handle -> {
try (Timer.Context ignored = getAllFromTimer.time()) {
return handle.createQuery("SELECT * FROM accounts ORDER BY " + UID + " LIMIT :limit")
return handle.createQuery("SELECT * FROM accounts ORDER BY " + NUMBER + " LIMIT :limit")
.bind("limit", length)
.mapTo(Account.class)
.list();

View File

@@ -121,8 +121,8 @@ public class AccountsManager {
return accounts.getAllFrom(length);
}
public List<Account> getAllFrom(UUID uuid, int length) {
return accounts.getAllFrom(uuid, length);
public List<Account> getAllFrom(String number, int length) {
return accounts.getAllFrom(number, length);
}
private void updateDirectory(Account account) {

View File

@@ -30,7 +30,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import io.dropwizard.metrics.MetricsFactory;
@@ -44,6 +43,8 @@ public class ActiveUserCounter implements AccountDatabaseCrawlerListener {
private static final String PLATFORM_IOS = "ios";
private static final String PLATFORM_ANDROID = "android";
private static final String FIRST_FROM_NUMBER = "+";
private static final String INTERVALS[] = {"daily", "weekly", "monthly", "quarterly", "yearly"};
private final MetricsFactory metricsFactory;
@@ -63,7 +64,7 @@ public class ActiveUserCounter implements AccountDatabaseCrawlerListener {
}
@Override
public void onCrawlChunk(Optional<UUID> fromNumber, List<Account> chunkAccounts) {
public void onCrawlChunk(Optional<String> fromNumber, List<Account> chunkAccounts) {
long nowDays = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis());
long agoMs[] = {TimeUnit.DAYS.toMillis(nowDays - 1),
TimeUnit.DAYS.toMillis(nowDays - 7),
@@ -106,11 +107,12 @@ public class ActiveUserCounter implements AccountDatabaseCrawlerListener {
}
}
incrementTallies(fromNumber.orElse(UUID.randomUUID()), platformIncrements, countryIncrements);
incrementTallies(fromNumber.orElse(FIRST_FROM_NUMBER), platformIncrements, countryIncrements);
}
@Override
public void onCrawlEnd(Optional<UUID> fromNumber) {
public void onCrawlEnd(Optional<String> fromNumber) {
MetricRegistry metrics = new MetricRegistry();
long intervalTallies[] = new long[INTERVALS.length];
ActiveUserTally activeUserTally = getFinalTallies();
@@ -154,18 +156,17 @@ public class ActiveUserCounter implements AccountDatabaseCrawlerListener {
return tally;
}
private void incrementTallies(UUID fromUuid, Map<String, long[]> platformIncrements, Map<String, long[]> countryIncrements) {
private void incrementTallies(String fromNumber, Map<String, long[]> platformIncrements, Map<String, long[]> countryIncrements) {
try (Jedis jedis = jedisPool.getWriteResource()) {
String tallyValue = jedis.get(TALLY_KEY);
ActiveUserTally activeUserTally;
if (tallyValue == null) {
activeUserTally = new ActiveUserTally(fromUuid, platformIncrements, countryIncrements);
activeUserTally = new ActiveUserTally(fromNumber, platformIncrements, countryIncrements);
} else {
activeUserTally = mapper.readValue(tallyValue, ActiveUserTally.class);
if (!fromUuid.equals(activeUserTally.getFromUuid())) {
activeUserTally.setFromUuid(fromUuid);
if (activeUserTally.getFromNumber() != fromNumber) {
activeUserTally.setFromNumber(fromNumber);
Map<String, long[]> platformTallies = activeUserTally.getPlatforms();
addTallyMaps(platformTallies, platformIncrements);
Map<String, long[]> countryTallies = activeUserTally.getCountries();

View File

@@ -30,12 +30,9 @@ import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Util;
import javax.ws.rs.ProcessingException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import static com.codahale.metrics.MetricRegistry.name;
@@ -58,16 +55,16 @@ public class DirectoryReconciler implements AccountDatabaseCrawlerListener {
public void onCrawlStart() { }
public void onCrawlEnd(Optional<UUID> fromUuid) {
DirectoryReconciliationRequest request = new DirectoryReconciliationRequest(fromUuid.orElse(null), null, Collections.emptyList(), Collections.emptyList());
public void onCrawlEnd(Optional<String> fromNumber) {
DirectoryReconciliationRequest request = new DirectoryReconciliationRequest(fromNumber.orElse(null), null, Collections.emptyList());
DirectoryReconciliationResponse response = sendChunk(request);
}
public void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) throws AccountDatabaseCrawlerRestartException {
public void onCrawlChunk(Optional<String> fromNumber, List<Account> chunkAccounts) throws AccountDatabaseCrawlerRestartException {
updateDirectoryCache(chunkAccounts);
DirectoryReconciliationRequest request = createChunkRequest(fromUuid, chunkAccounts);
DirectoryReconciliationRequest request = createChunkRequest(fromNumber, chunkAccounts);
DirectoryReconciliationResponse response = sendChunk(request);
if (response.getStatus() == DirectoryReconciliationResponse.Status.MISSING) {
throw new AccountDatabaseCrawlerRestartException("directory reconciler missing");
@@ -94,23 +91,19 @@ public class DirectoryReconciler implements AccountDatabaseCrawlerListener {
}
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private DirectoryReconciliationRequest createChunkRequest(Optional<UUID> fromUuid, List<Account> accounts) {
List<UUID> uuids = new ArrayList<>(accounts.size());
List<String> numbers = new ArrayList<>(accounts.size());
for (Account account : accounts) {
if (account.isEnabled()) {
uuids.add(account.getUuid());
numbers.add(account.getNumber());
}
}
private DirectoryReconciliationRequest createChunkRequest(Optional<String> fromNumber, List<Account> accounts) {
List<String> numbers = accounts.stream()
.filter(Account::isEnabled)
.map(Account::getNumber)
.collect(Collectors.toList());
Optional<UUID> toUuid = Optional.empty();
Optional<String> toNumber = Optional.empty();
if (!accounts.isEmpty()) {
toUuid = Optional.of(accounts.get(accounts.size() - 1).getUuid());
toNumber = Optional.of(accounts.get(accounts.size() - 1).getNumber());
}
return new DirectoryReconciliationRequest(fromUuid.orElse(null), toUuid.orElse(null), uuids, numbers);
return new DirectoryReconciliationRequest(fromNumber.orElse(null), toNumber.orElse(null), numbers);
}
private DirectoryReconciliationResponse sendChunk(DirectoryReconciliationRequest request) {

View File

@@ -9,7 +9,6 @@ import org.whispersystems.textsecuregcm.util.Util;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static com.codahale.metrics.MetricRegistry.name;
@@ -32,7 +31,7 @@ public class PushFeedbackProcessor implements AccountDatabaseCrawlerListener {
public void onCrawlStart() {}
@Override
public void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) {
public void onCrawlChunk(Optional<String> fromNumber, List<Account> chunkAccounts) {
for (Account account : chunkAccounts) {
boolean update = false;
@@ -66,5 +65,5 @@ public class PushFeedbackProcessor implements AccountDatabaseCrawlerListener {
}
@Override
public void onCrawlEnd(Optional<UUID> toUuid) {}
public void onCrawlEnd(Optional<String> fromNumber) {}
}