Support for UUID based addressing

This commit is contained in:
Moxie Marlinspike
2019-06-20 19:25:15 -07:00
parent 0f8cb7ea6d
commit 7a3a385569
51 changed files with 1379 additions and 695 deletions

View File

@@ -45,7 +45,6 @@ import org.whispersystems.textsecuregcm.controllers.MessageController;
import org.whispersystems.textsecuregcm.controllers.ProfileController;
import org.whispersystems.textsecuregcm.controllers.ProvisioningController;
import org.whispersystems.textsecuregcm.controllers.SecureStorageController;
import org.whispersystems.textsecuregcm.controllers.TransparentDataController;
import org.whispersystems.textsecuregcm.controllers.StickerController;
import org.whispersystems.textsecuregcm.controllers.VoiceVerificationController;
import org.whispersystems.textsecuregcm.limits.RateLimiters;
@@ -261,7 +260,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
environment.jersey().register(new ProvisioningController(rateLimiters, pushSender));
environment.jersey().register(new CertificateController(new CertificateGenerator(config.getDeliveryCertificate().getCertificate(), config.getDeliveryCertificate().getPrivateKey(), config.getDeliveryCertificate().getExpiresDays())));
environment.jersey().register(new VoiceVerificationController(config.getVoiceVerificationConfiguration().getUrl(), config.getVoiceVerificationConfiguration().getLocales()));
environment.jersey().register(new TransparentDataController(accountsManager, config.getTransparentDataIndex()));
environment.jersey().register(new SecureStorageController(storageCredentialsGenerator));
environment.jersey().register(attachmentControllerV1);
environment.jersey().register(attachmentControllerV2);

View File

@@ -0,0 +1,35 @@
package org.whispersystems.textsecuregcm.auth;
import java.util.UUID;
public class AmbiguousIdentifier {
private final UUID uuid;
private final String number;
public AmbiguousIdentifier(String target) {
if (target.startsWith("+")) {
this.uuid = null;
this.number = target;
} else {
this.uuid = UUID.fromString(target);
this.number = null;
}
}
public UUID getUuid() {
return uuid;
}
public String getNumber() {
return number;
}
public boolean hasUuid() {
return uuid != null;
}
public boolean hasNumber() {
return number != null;
}
}

View File

@@ -24,20 +24,20 @@ import java.io.IOException;
public class AuthorizationHeader {
private final String number;
private final long accountId;
private final String password;
private final AmbiguousIdentifier identifier;
private final long deviceId;
private final String password;
private AuthorizationHeader(String number, long accountId, String password) {
this.number = number;
this.accountId = accountId;
this.password = password;
private AuthorizationHeader(AmbiguousIdentifier identifier, long deviceId, String password) {
this.identifier = identifier;
this.deviceId = deviceId;
this.password = password;
}
public static AuthorizationHeader fromUserAndPassword(String user, String password) throws InvalidAuthorizationHeaderException {
try {
String[] numberAndId = user.split("\\.");
return new AuthorizationHeader(numberAndId[0],
return new AuthorizationHeader(new AmbiguousIdentifier(numberAndId[0]),
numberAndId.length > 1 ? Long.parseLong(numberAndId[1]) : 1,
password);
} catch (NumberFormatException nfe) {
@@ -79,12 +79,12 @@ public class AuthorizationHeader {
}
}
public String getNumber() {
return number;
public AmbiguousIdentifier getIdentifier() {
return identifier;
}
public long getDeviceId() {
return accountId;
return deviceId;
}
public String getPassword() {

View File

@@ -12,6 +12,7 @@ import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Util;
import java.util.Optional;
import java.util.UUID;
import static com.codahale.metrics.MetricRegistry.name;
import io.dropwizard.auth.basic.BasicCredentials;
@@ -38,7 +39,7 @@ public class BaseAccountAuthenticator {
public Optional<Account> authenticate(BasicCredentials basicCredentials, boolean enabledRequired) {
try {
AuthorizationHeader authorizationHeader = AuthorizationHeader.fromUserAndPassword(basicCredentials.getUsername(), basicCredentials.getPassword());
Optional<Account> account = accountsManager.get(authorizationHeader.getNumber());
Optional<Account> account = accountsManager.get(authorizationHeader.getIdentifier());
if (!account.isPresent()) {
noSuchAccountMeter.mark();
@@ -73,7 +74,7 @@ public class BaseAccountAuthenticator {
authenticationFailedMeter.mark();
return Optional.empty();
} catch (InvalidAuthorizationHeaderException iahe) {
} catch (IllegalArgumentException | InvalidAuthorizationHeaderException iae) {
invalidAuthHeaderMeter.mark();
return Optional.empty();
}

View File

@@ -31,6 +31,7 @@ public class CertificateGenerator {
public byte[] createFor(Account account, Device device) throws IOException, InvalidKeyException {
byte[] certificate = SenderCertificate.Certificate.newBuilder()
.setSender(account.getNumber())
.setSenderUuid(account.getUuid().toString())
.setSenderDevice(Math.toIntExact(device.getId()))
.setExpires(System.currentTimeMillis() + TimeUnit.DAYS.toMillis(expiresDays))
.setIdentityKey(ByteString.copyFrom(Base64.decode(account.getIdentityKey())))

View File

@@ -33,6 +33,7 @@ import org.whispersystems.textsecuregcm.auth.StoredVerificationCode;
import org.whispersystems.textsecuregcm.auth.TurnToken;
import org.whispersystems.textsecuregcm.auth.TurnTokenGenerator;
import org.whispersystems.textsecuregcm.entities.AccountAttributes;
import org.whispersystems.textsecuregcm.entities.AccountCreationResult;
import org.whispersystems.textsecuregcm.entities.ApnRegistrationId;
import org.whispersystems.textsecuregcm.entities.DeviceName;
import org.whispersystems.textsecuregcm.entities.GcmRegistrationId;
@@ -78,6 +79,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static com.codahale.metrics.MetricRegistry.name;
@@ -245,17 +247,21 @@ public class AccountController {
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@Path("/code/{verification_code}")
public void verifyAccount(@PathParam("verification_code") String verificationCode,
@HeaderParam("Authorization") String authorizationHeader,
@HeaderParam("X-Signal-Agent") String userAgent,
@Valid AccountAttributes accountAttributes)
public AccountCreationResult verifyAccount(@PathParam("verification_code") String verificationCode,
@HeaderParam("Authorization") String authorizationHeader,
@HeaderParam("X-Signal-Agent") String userAgent,
@Valid AccountAttributes accountAttributes)
throws RateLimitExceededException
{
try {
AuthorizationHeader header = AuthorizationHeader.fromFullHeader(authorizationHeader);
String number = header.getNumber();
String number = header.getIdentifier().getNumber();
String password = header.getPassword();
if (number == null) {
throw new WebApplicationException(400);
}
rateLimiters.getVerifyLimiter().validate(number);
Optional<StoredVerificationCode> storedVerificationCode = pendingAccounts.getCodeForNumber(number);
@@ -308,9 +314,11 @@ public class AccountController {
rateLimiters.getPinLimiter().clear(number);
}
createAccount(number, password, userAgent, accountAttributes);
Account account = createAccount(number, password, userAgent, accountAttributes);
metricRegistry.meter(name(AccountController.class, "verify", Util.getCountryCode(number))).mark();
return new AccountCreationResult(account.getUuid());
} catch (InvalidAuthorizationHeaderException e) {
logger.info("Bad Authorization Header", e);
throw new WebApplicationException(Response.status(401).build());
@@ -502,6 +510,13 @@ public class AccountController {
accounts.update(account);
}
@GET
@Path("/whoami")
@Produces(MediaType.APPLICATION_JSON)
public AccountCreationResult whoAmI(@Auth Account account) {
return new AccountCreationResult(account.getUuid());
}
private CaptchaRequirement requiresCaptcha(String number, String transport, String forwardedFor,
String requester,
Optional<String> captchaToken,
@@ -576,7 +591,7 @@ public class AccountController {
return false;
}
private void createAccount(String number, String password, String userAgent, AccountAttributes accountAttributes) {
private Account createAccount(String number, String password, String userAgent, AccountAttributes accountAttributes) {
Device device = new Device();
device.setId(Device.MASTER_ID);
device.setAuthenticationCredentials(new AuthenticationCredentials(password));
@@ -591,6 +606,7 @@ public class AccountController {
Account account = new Account();
account.setNumber(number);
account.setUuid(UUID.randomUUID());
account.addDevice(device);
account.setPin(accountAttributes.getPin());
account.setUnidentifiedAccessKey(accountAttributes.getUnidentifiedAccessKey());
@@ -608,6 +624,8 @@ public class AccountController {
messagesManager.clear(number);
pendingAccounts.remove(number);
return account;
}
@VisibleForTesting protected

View File

@@ -164,9 +164,11 @@ public class DeviceController {
{
try {
AuthorizationHeader header = AuthorizationHeader.fromFullHeader(authorizationHeader);
String number = header.getNumber();
String number = header.getIdentifier().getNumber();
String password = header.getPassword();
if (number == null) throw new WebApplicationException(400);
rateLimiters.getVerifyDeviceLimiter().validate(number);
Optional<StoredVerificationCode> storedVerificationCode = pendingDevices.getCodeForNumber(number);

View File

@@ -19,6 +19,7 @@ package org.whispersystems.textsecuregcm.controllers;
import com.codahale.metrics.annotation.Timed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.AmbiguousIdentifier;
import org.whispersystems.textsecuregcm.auth.Anonymous;
import org.whispersystems.textsecuregcm.auth.DisabledPermittedAccount;
import org.whispersystems.textsecuregcm.auth.OptionalAccess;
@@ -115,11 +116,11 @@ public class KeysController {
@Timed
@GET
@Path("/{number}/{device_id}")
@Path("/{identifier}/{device_id}")
@Produces(MediaType.APPLICATION_JSON)
public Optional<PreKeyResponse> getDeviceKeys(@Auth Optional<Account> account,
@HeaderParam(OptionalAccess.UNIDENTIFIED) Optional<Anonymous> accessKey,
@PathParam("number") String number,
@PathParam("identifier") AmbiguousIdentifier targetName,
@PathParam("device_id") String deviceId)
throws RateLimitExceededException
{
@@ -127,13 +128,13 @@ public class KeysController {
throw new WebApplicationException(Response.Status.UNAUTHORIZED);
}
Optional<Account> target = accounts.get(number);
Optional<Account> target = accounts.get(targetName);
OptionalAccess.verify(account, accessKey, target, deviceId);
assert(target.isPresent());
if (account.isPresent()) {
rateLimiters.getPreKeysLimiter().validate(account.get().getNumber() + "__" + number + "." + deviceId);
rateLimiters.getPreKeysLimiter().validate(account.get().getNumber() + "__" + target.get().getNumber() + "." + deviceId);
}
List<KeyRecord> targetKeys = getLocalKeys(target.get(), deviceId);

View File

@@ -23,6 +23,7 @@ import com.codahale.metrics.annotation.Timed;
import com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.AmbiguousIdentifier;
import org.whispersystems.textsecuregcm.auth.Anonymous;
import org.whispersystems.textsecuregcm.auth.OptionalAccess;
import org.whispersystems.textsecuregcm.entities.IncomingMessage;
@@ -109,7 +110,7 @@ public class MessageController {
@Produces(MediaType.APPLICATION_JSON)
public SendMessageResponse sendMessage(@Auth Optional<Account> source,
@HeaderParam(OptionalAccess.UNIDENTIFIED) Optional<Anonymous> accessKey,
@PathParam("destination") String destinationName,
@PathParam("destination") AmbiguousIdentifier destinationName,
@Valid IncomingMessageList messages)
throws RateLimitExceededException
{
@@ -117,18 +118,18 @@ public class MessageController {
throw new WebApplicationException(Response.Status.UNAUTHORIZED);
}
if (source.isPresent() && !source.get().getNumber().equals(destinationName)) {
if (source.isPresent() && !source.get().isFor(destinationName)) {
rateLimiters.getMessagesLimiter().validate(source.get().getNumber() + "__" + destinationName);
}
if (source.isPresent() && !source.get().getNumber().equals(destinationName)) {
if (source.isPresent() && !source.get().isFor(destinationName)) {
identifiedMeter.mark();
} else {
} else if (!source.isPresent()) {
unidentifiedMeter.mark();
}
try {
boolean isSyncMessage = source.isPresent() && source.get().getNumber().equals(destinationName);
boolean isSyncMessage = source.isPresent() && source.get().isFor(destinationName);
Optional<Account> destination;
@@ -246,6 +247,7 @@ public class MessageController {
if (source.isPresent()) {
messageBuilder.setSource(source.get().getNumber())
.setSourceUuid(source.get().getUuid().toString())
.setSourceDevice((int)source.get().getAuthenticatedDevice().get().getId());
}

View File

@@ -10,6 +10,7 @@ import com.codahale.metrics.annotation.Timed;
import org.apache.commons.codec.binary.Base64;
import org.hibernate.validator.constraints.Length;
import org.hibernate.validator.valuehandling.UnwrapValidatedValue;
import org.whispersystems.textsecuregcm.auth.AmbiguousIdentifier;
import org.whispersystems.textsecuregcm.auth.Anonymous;
import org.whispersystems.textsecuregcm.auth.OptionalAccess;
import org.whispersystems.textsecuregcm.auth.UnidentifiedAccessChecksum;
@@ -79,10 +80,10 @@ public class ProfileController {
@Timed
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/{number}")
@Path("/{identifier}")
public Profile getProfile(@Auth Optional<Account> requestAccount,
@HeaderParam(OptionalAccess.UNIDENTIFIED) Optional<Anonymous> accessKey,
@PathParam("number") String number,
@PathParam("identifier") AmbiguousIdentifier identifier,
@QueryParam("ca") boolean useCaCertificate)
throws RateLimitExceededException
{
@@ -94,7 +95,7 @@ public class ProfileController {
rateLimiters.getProfileLimiter().validate(requestAccount.get().getNumber());
}
Optional<Account> accountProfile = accountsManager.get(number);
Optional<Account> accountProfile = accountsManager.get(identifier);
OptionalAccess.verify(requestAccount, accessKey, accountProfile);
//noinspection ConstantConditions,OptionalGetWithoutIsPresent

View File

@@ -1,42 +0,0 @@
package org.whispersystems.textsecuregcm.controllers;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.PublicAccount;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import java.util.Map;
import java.util.Optional;
@Path("/v1/transparency/")
public class TransparentDataController {
private final AccountsManager accountsManager;
private final Map<String, String> transparentDataIndex;
public TransparentDataController(AccountsManager accountsManager,
Map<String, String> transparentDataIndex)
{
this.accountsManager = accountsManager;
this.transparentDataIndex = transparentDataIndex;
}
@GET
@Path("/account/{id}")
@Produces(MediaType.APPLICATION_JSON)
public Optional<PublicAccount> getAccount(@PathParam("id") String id) {
String index = transparentDataIndex.get(id);
if (index != null) {
return accountsManager.get(index).map(PublicAccount::new);
}
return Optional.empty();
}
}

View File

@@ -0,0 +1,21 @@
package org.whispersystems.textsecuregcm.entities;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.UUID;
public class AccountCreationResult {
@JsonProperty
private UUID uuid;
public AccountCreationResult() {}
public AccountCreationResult(UUID uuid) {
this.uuid = uuid;
}
public UUID getUuid() {
return uuid;
}
}

View File

@@ -19,10 +19,11 @@ package org.whispersystems.textsecuregcm.entities;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Map;
import java.util.HashMap;
import java.util.UUID;
public class ActiveUserTally {
@JsonProperty
private String fromNumber;
private UUID fromUuid;
@JsonProperty
private Map<String, long[]> platforms;
@@ -32,14 +33,14 @@ public class ActiveUserTally {
public ActiveUserTally() {}
public ActiveUserTally(String fromNumber, Map<String, long[]> platforms, Map<String, long[]> countries) {
this.fromNumber = fromNumber;
public ActiveUserTally(UUID fromUuid, Map<String, long[]> platforms, Map<String, long[]> countries) {
this.fromUuid = fromUuid;
this.platforms = platforms;
this.countries = countries;
}
public String getFromNumber() {
return this.fromNumber;
public UUID getFromUuid() {
return this.fromUuid;
}
public Map<String, long[]> getPlatforms() {
@@ -50,8 +51,8 @@ public class ActiveUserTally {
return this.countries;
}
public void setFromNumber(String fromNumber) {
this.fromNumber = fromNumber;
public void setFromUuid(UUID fromUuid) {
this.fromUuid = fromUuid;
}
}

View File

@@ -19,14 +19,15 @@ package org.whispersystems.textsecuregcm.entities;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.UUID;
public class DirectoryReconciliationRequest {
@JsonProperty
private String fromNumber;
private UUID fromUuid;
@JsonProperty
private String toNumber;
private UUID toUuid;
@JsonProperty
private List<String> numbers;
@@ -34,18 +35,18 @@ public class DirectoryReconciliationRequest {
public DirectoryReconciliationRequest() {
}
public DirectoryReconciliationRequest(String fromNumber, String toNumber, List<String> numbers) {
this.fromNumber = fromNumber;
this.toNumber = toNumber;
this.numbers = numbers;
public DirectoryReconciliationRequest(UUID fromUuid, UUID toUuid, List<String> numbers) {
this.fromUuid = fromUuid;
this.toUuid = toUuid;
this.numbers = numbers;
}
public String getFromNumber() {
return fromNumber;
public UUID getFromUuid() {
return fromUuid;
}
public String getToNumber() {
return toNumber;
public UUID getToUuid() {
return toUuid;
}
public List<String> getNumbers() {

View File

@@ -28,6 +28,9 @@ public class OutgoingMessageEntity {
@JsonProperty
private String source;
@JsonProperty
private UUID sourceUuid;
@JsonProperty
private int sourceDevice;
@@ -44,8 +47,8 @@ public class OutgoingMessageEntity {
public OutgoingMessageEntity(long id, boolean cached,
UUID guid, int type, String relay, long timestamp,
String source, int sourceDevice, byte[] message,
byte[] content, long serverTimestamp)
String source, UUID sourceUuid, int sourceDevice,
byte[] message, byte[] content, long serverTimestamp)
{
this.id = id;
this.cached = cached;
@@ -54,6 +57,7 @@ public class OutgoingMessageEntity {
this.relay = relay;
this.timestamp = timestamp;
this.source = source;
this.sourceUuid = sourceUuid;
this.sourceDevice = sourceDevice;
this.message = message;
this.content = content;
@@ -80,6 +84,10 @@ public class OutgoingMessageEntity {
return source;
}
public UUID getSourceUuid() {
return sourceUuid;
}
public int getSourceDevice() {
return sourceDevice;
}

View File

@@ -21,11 +21,14 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import org.whispersystems.textsecuregcm.auth.AmbiguousIdentifier;
import javax.security.auth.Subject;
import java.security.Principal;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
public class Account implements Principal {
@@ -33,6 +36,9 @@ public class Account implements Principal {
static final int MEMCACHE_VERION = 5;
@JsonIgnore
private UUID uuid;
@JsonProperty
private String number;
@JsonProperty
@@ -71,8 +77,9 @@ public class Account implements Principal {
public Account() {}
@VisibleForTesting
public Account(String number, Set<Device> devices, byte[] unidentifiedAccessKey) {
public Account(String number, UUID uuid, Set<Device> devices, byte[] unidentifiedAccessKey) {
this.number = number;
this.uuid = uuid;
this.devices = devices;
this.unidentifiedAccessKey = unidentifiedAccessKey;
}
@@ -85,6 +92,14 @@ public class Account implements Principal {
this.authenticatedDevice = device;
}
public UUID getUuid() {
return uuid;
}
public void setUuid(UUID uuid) {
this.uuid = uuid;
}
public void setNumber(String number) {
this.number = number;
}
@@ -247,6 +262,12 @@ public class Account implements Principal {
this.unrestrictedUnidentifiedAccess = unrestrictedUnidentifiedAccess;
}
public boolean isFor(AmbiguousIdentifier identifier) {
if (identifier.hasUuid()) return identifier.getUuid().equals(uuid);
else if (identifier.hasNumber()) return identifier.getNumber().equals(number);
else throw new AssertionError();
}
// Principal implementation
@Override

View File

@@ -26,6 +26,7 @@ import org.whispersystems.textsecuregcm.util.Util;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static com.codahale.metrics.MetricRegistry.name;
@@ -51,7 +52,7 @@ public class AccountCleaner implements AccountDatabaseCrawlerListener {
}
@Override
public void onCrawlChunk(Optional<String> fromNumber, List<Account> chunkAccounts) {
public void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) {
int accountUpdateCount = 0;
for (Account account : chunkAccounts) {
if (needsExplicitRemoval(account)) {
@@ -74,7 +75,7 @@ public class AccountCleaner implements AccountDatabaseCrawlerListener {
}
@Override
public void onCrawlEnd(Optional<String> fromNumber) {
public void onCrawlEnd(Optional<UUID> fromUuid) {
}
private boolean needsExplicitRemoval(Account account) {

View File

@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static com.codahale.metrics.MetricRegistry.name;
import io.dropwizard.lifecycle.Managed;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public class AccountDatabaseCrawler implements Managed, Runnable {
private static final Logger logger = LoggerFactory.getLogger(AccountDatabaseCrawler.class);
@@ -91,6 +92,7 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
sleepWhileRunning(accelerated ? ACCELERATED_CHUNK_INTERVAL : chunkIntervalMs);
} catch (Throwable t) {
logger.warn("error in database crawl: ", t);
Util.sleep(10000);
}
}
@@ -120,26 +122,26 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
}
private void processChunk() {
Optional<String> fromNumber = cache.getLastNumber();
Optional<UUID> fromUuid = cache.getLastUuid();
if (!fromNumber.isPresent()) {
listeners.forEach(listener -> { listener.onCrawlStart(); });
if (!fromUuid.isPresent()) {
listeners.forEach(AccountDatabaseCrawlerListener::onCrawlStart);
}
List<Account> chunkAccounts = readChunk(fromNumber, chunkSize);
List<Account> chunkAccounts = readChunk(fromUuid, chunkSize);
if (chunkAccounts.isEmpty()) {
listeners.forEach(listener -> { listener.onCrawlEnd(fromNumber); });
cache.setLastNumber(Optional.empty());
listeners.forEach(listener -> listener.onCrawlEnd(fromUuid));
cache.setLastUuid(Optional.empty());
cache.clearAccelerate();
} else {
try {
for (AccountDatabaseCrawlerListener listener : listeners) {
listener.onCrawlChunk(fromNumber, chunkAccounts);
listener.onCrawlChunk(fromUuid, chunkAccounts);
}
cache.setLastNumber(Optional.of(chunkAccounts.get(chunkAccounts.size() - 1).getNumber()));
cache.setLastUuid(Optional.of(chunkAccounts.get(chunkAccounts.size() - 1).getUuid()));
} catch (AccountDatabaseCrawlerRestartException e) {
cache.setLastNumber(Optional.empty());
cache.setLastUuid(Optional.empty());
cache.clearAccelerate();
}
@@ -147,12 +149,12 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
}
private List<Account> readChunk(Optional<String> fromNumber, int chunkSize) {
private List<Account> readChunk(Optional<UUID> fromUuid, int chunkSize) {
try (Timer.Context timer = readChunkTimer.time()) {
List<Account> chunkAccounts;
if (fromNumber.isPresent()) {
chunkAccounts = accounts.getAllFrom(fromNumber.get(), chunkSize);
if (fromUuid.isPresent()) {
chunkAccounts = accounts.getAllFrom(fromUuid.get(), chunkSize);
} else {
chunkAccounts = accounts.getAllFrom(chunkSize);
}

View File

@@ -1,4 +1,4 @@
/**
/*
* Copyright (C) 2018 Open WhisperSystems
* <p>
* This program is free software: you can redistribute it and/or modify
@@ -18,17 +18,20 @@ package org.whispersystems.textsecuregcm.storage;
import org.whispersystems.textsecuregcm.redis.LuaScript;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import redis.clients.jedis.Jedis;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import redis.clients.jedis.Jedis;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public class AccountDatabaseCrawlerCache {
private static final String ACTIVE_WORKER_KEY = "account_database_crawler_cache_active_worker";
private static final String LAST_NUMBER_KEY = "account_database_crawler_cache_last_number";
private static final String LAST_UUID_KEY = "account_database_crawler_cache_last_uuid";
private static final String ACCELERATE_KEY = "account_database_crawler_cache_accelerate";
private static final long LAST_NUMBER_TTL_MS = 86400_000L;
@@ -65,18 +68,21 @@ public class AccountDatabaseCrawlerCache {
luaScript.execute(keys, args);
}
public Optional<String> getLastNumber() {
public Optional<UUID> getLastUuid() {
try (Jedis jedis = jedisPool.getWriteResource()) {
return Optional.ofNullable(jedis.get(LAST_NUMBER_KEY));
String lastUuidString = jedis.get(LAST_UUID_KEY);
if (lastUuidString == null) return Optional.empty();
else return Optional.of(UUID.fromString(lastUuidString));
}
}
public void setLastNumber(Optional<String> lastNumber) {
public void setLastUuid(Optional<UUID> lastUuid) {
try (Jedis jedis = jedisPool.getWriteResource()) {
if (lastNumber.isPresent()) {
jedis.psetex(LAST_NUMBER_KEY, LAST_NUMBER_TTL_MS, lastNumber.get());
if (lastUuid.isPresent()) {
jedis.psetex(LAST_UUID_KEY, LAST_NUMBER_TTL_MS, lastUuid.get().toString());
} else {
jedis.del(LAST_NUMBER_KEY);
jedis.del(LAST_UUID_KEY);
}
}
}

View File

@@ -18,9 +18,11 @@ package org.whispersystems.textsecuregcm.storage;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public interface AccountDatabaseCrawlerListener {
void onCrawlStart();
void onCrawlChunk(Optional<String> fromNumber, List<Account> chunkAccounts) throws AccountDatabaseCrawlerRestartException;
void onCrawlEnd(Optional<String> fromNumber);
void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) throws AccountDatabaseCrawlerRestartException;
void onCrawlEnd(Optional<UUID> fromUuid);
}

View File

@@ -28,24 +28,27 @@ import org.whispersystems.textsecuregcm.util.SystemMapper;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import static com.codahale.metrics.MetricRegistry.name;
public class Accounts {
public static final String ID = "id";
public static final String UID = "uuid";
public static final String NUMBER = "number";
public static final String DATA = "data";
private static final ObjectMapper mapper = SystemMapper.getMapper();
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private final Timer createTimer = metricRegistry.timer(name(Accounts.class, "create"));
private final Timer updateTimer = metricRegistry.timer(name(Accounts.class, "update"));
private final Timer getTimer = metricRegistry.timer(name(Accounts.class, "get"));
private final Timer getAllFromTimer = metricRegistry.timer(name(Accounts.class, "getAllFrom"));
private final Timer createTimer = metricRegistry.timer(name(Accounts.class, "create" ));
private final Timer updateTimer = metricRegistry.timer(name(Accounts.class, "update" ));
private final Timer getByNumberTimer = metricRegistry.timer(name(Accounts.class, "getByNumber" ));
private final Timer getByUuidTimer = metricRegistry.timer(name(Accounts.class, "getByUuid" ));
private final Timer getAllFromTimer = metricRegistry.timer(name(Accounts.class, "getAllFrom" ));
private final Timer getAllFromOffsetTimer = metricRegistry.timer(name(Accounts.class, "getAllFromOffset"));
private final Timer vacuumTimer = metricRegistry.timer(name(Accounts.class, "vacuum"));
private final Timer vacuumTimer = metricRegistry.timer(name(Accounts.class, "vacuum" ));
private final FaultTolerantDatabase database;
@@ -57,16 +60,15 @@ public class Accounts {
public boolean create(Account account) {
return database.with(jdbi -> jdbi.inTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> {
try (Timer.Context ignored = createTimer.time()) {
int rows = handle.createUpdate("DELETE FROM accounts WHERE " + NUMBER + " = :number")
.bind("number", account.getNumber())
.execute();
UUID uuid = handle.createQuery("INSERT INTO accounts (" + NUMBER + ", " + UID + ", " + DATA + ") VALUES (:number, :uuid, CAST(:data AS json)) ON CONFLICT(number) DO UPDATE SET data = EXCLUDED.data RETURNING uuid")
.bind("number", account.getNumber())
.bind("uuid", account.getUuid())
.bind("data", mapper.writeValueAsString(account))
.mapTo(UUID.class)
.findOnly();
handle.createUpdate("INSERT INTO accounts (" + NUMBER + ", " + DATA + ") VALUES (:number, CAST(:data AS json))")
.bind("number", account.getNumber())
.bind("data", mapper.writeValueAsString(account))
.execute();
return rows == 0;
account.setUuid(uuid);
return uuid.equals(account.getUuid());
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(e);
}
@@ -76,8 +78,8 @@ public class Accounts {
public void update(Account account) {
database.use(jdbi -> jdbi.useHandle(handle -> {
try (Timer.Context ignored = updateTimer.time()) {
handle.createUpdate("UPDATE accounts SET " + DATA + " = CAST(:data AS json) WHERE " + NUMBER + " = :number")
.bind("number", account.getNumber())
handle.createUpdate("UPDATE accounts SET " + DATA + " = CAST(:data AS json) WHERE " + UID + " = :uuid")
.bind("uuid", account.getUuid())
.bind("data", mapper.writeValueAsString(account))
.execute();
} catch (JsonProcessingException e) {
@@ -88,7 +90,7 @@ public class Accounts {
public Optional<Account> get(String number) {
return database.with(jdbi -> jdbi.withHandle(handle -> {
try (Timer.Context ignored = getTimer.time()) {
try (Timer.Context ignored = getByNumberTimer.time()) {
return handle.createQuery("SELECT * FROM accounts WHERE " + NUMBER + " = :number")
.bind("number", number)
.mapTo(Account.class)
@@ -97,10 +99,21 @@ public class Accounts {
}));
}
public List<Account> getAllFrom(String from, int length) {
public Optional<Account> get(UUID uuid) {
return database.with(jdbi -> jdbi.withHandle(handle -> {
try (Timer.Context ignored = getByUuidTimer.time()) {
return handle.createQuery("SELECT * FROM accounts WHERE " + UID + " = :uuid")
.bind("uuid", uuid)
.mapTo(Account.class)
.findFirst();
}
}));
}
public List<Account> getAllFrom(UUID from, int length) {
return database.with(jdbi -> jdbi.withHandle(handle -> {
try (Timer.Context ignored = getAllFromOffsetTimer.time()) {
return handle.createQuery("SELECT * FROM accounts WHERE " + NUMBER + " > :from ORDER BY " + NUMBER + " LIMIT :limit")
return handle.createQuery("SELECT * FROM accounts WHERE " + UID + " > :from ORDER BY " + UID + " LIMIT :limit")
.bind("from", from)
.bind("limit", length)
.mapTo(Account.class)
@@ -112,7 +125,7 @@ public class Accounts {
public List<Account> getAllFrom(int length) {
return database.with(jdbi -> jdbi.withHandle(handle -> {
try (Timer.Context ignored = getAllFromTimer.time()) {
return handle.createQuery("SELECT * FROM accounts ORDER BY " + NUMBER + " LIMIT :limit")
return handle.createQuery("SELECT * FROM accounts ORDER BY " + UID + " LIMIT :limit")
.bind("limit", length)
.mapTo(Account.class)
.list();

View File

@@ -24,6 +24,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.AmbiguousIdentifier;
import org.whispersystems.textsecuregcm.entities.ClientContact;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.Constants;
@@ -33,6 +34,7 @@ import org.whispersystems.textsecuregcm.util.Util;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import static com.codahale.metrics.MetricRegistry.name;
import redis.clients.jedis.Jedis;
@@ -40,13 +42,15 @@ import redis.clients.jedis.exceptions.JedisException;
public class AccountsManager {
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private static final Timer createTimer = metricRegistry.timer(name(AccountsManager.class, "create" ));
private static final Timer updateTimer = metricRegistry.timer(name(AccountsManager.class, "update" ));
private static final Timer getTimer = metricRegistry.timer(name(AccountsManager.class, "get" ));
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private static final Timer createTimer = metricRegistry.timer(name(AccountsManager.class, "create" ));
private static final Timer updateTimer = metricRegistry.timer(name(AccountsManager.class, "update" ));
private static final Timer getByNumberTimer = metricRegistry.timer(name(AccountsManager.class, "getByNumber"));
private static final Timer getByUuidTimer = metricRegistry.timer(name(AccountsManager.class, "getByUuid" ));
private static final Timer redisSetTimer = metricRegistry.timer(name(AccountsManager.class, "redisSet" ));
private static final Timer redisGetTimer = metricRegistry.timer(name(AccountsManager.class, "redisGet" ));
private static final Timer redisSetTimer = metricRegistry.timer(name(AccountsManager.class, "redisSet" ));
private static final Timer redisNumberGetTimer = metricRegistry.timer(name(AccountsManager.class, "redisNumberGet"));
private static final Timer redisUuidGetTimer = metricRegistry.timer(name(AccountsManager.class, "redisUuidGet" ));
private final Logger logger = LoggerFactory.getLogger(AccountsManager.class);
@@ -65,7 +69,7 @@ public class AccountsManager {
public boolean create(Account account) {
try (Timer.Context context = createTimer.time()) {
boolean freshUser = databaseCreate(account);
redisSet(account.getNumber(), account, false);
redisSet(account);
updateDirectory(account);
return freshUser;
@@ -74,31 +78,51 @@ public class AccountsManager {
public void update(Account account) {
try (Timer.Context context = updateTimer.time()) {
redisSet(account.getNumber(), account, false);
redisSet(account);
databaseUpdate(account);
updateDirectory(account);
}
}
public Optional<Account> get(AmbiguousIdentifier identifier) {
if (identifier.hasNumber()) return get(identifier.getNumber());
else if (identifier.hasUuid()) return get(identifier.getUuid());
else throw new AssertionError();
}
public Optional<Account> get(String number) {
try (Timer.Context context = getTimer.time()) {
try (Timer.Context context = getByNumberTimer.time()) {
Optional<Account> account = redisGet(number);
if (!account.isPresent()) {
account = databaseGet(number);
account.ifPresent(value -> redisSet(number, value, true));
account.ifPresent(value -> redisSet(value));
}
return account;
}
}
public Optional<Account> get(UUID uuid) {
try (Timer.Context context = getByUuidTimer.time()) {
Optional<Account> account = redisGet(uuid);
if (!account.isPresent()) {
account = databaseGet(uuid);
account.ifPresent(value -> redisSet(value));
}
return account;
}
}
public List<Account> getAllFrom(int length) {
return accounts.getAllFrom(length);
}
public List<Account> getAllFrom(String number, int length) {
return accounts.getAllFrom(number, length);
public List<Account> getAllFrom(UUID uuid, int length) {
return accounts.getAllFrom(uuid, length);
}
private void updateDirectory(Account account) {
@@ -111,15 +135,20 @@ public class AccountsManager {
}
}
private String getKey(String number) {
return Account.class.getSimpleName() + Account.MEMCACHE_VERION + number;
private String getAccountMapKey(String number) {
return "AccountMap::" + number;
}
private void redisSet(String number, Account account, boolean optional) {
private String getAccountEntityKey(UUID uuid) {
return "Account::" + uuid.toString();
}
private void redisSet(Account account) {
try (Jedis jedis = cacheClient.getWriteResource();
Timer.Context timer = redisSetTimer.time())
{
jedis.set(getKey(number), mapper.writeValueAsString(account));
jedis.set(getAccountMapKey(account.getNumber()), account.getUuid().toString());
jedis.set(getAccountEntityKey(account.getUuid()), mapper.writeValueAsString(account));
} catch (JsonProcessingException e) {
throw new IllegalStateException(e);
}
@@ -127,20 +156,14 @@ public class AccountsManager {
private Optional<Account> redisGet(String number) {
try (Jedis jedis = cacheClient.getReadResource();
Timer.Context timer = redisGetTimer.time())
Timer.Context timer = redisNumberGetTimer.time())
{
String json = jedis.get(getKey(number));
String uuid = jedis.get(getAccountMapKey(number));
if (json != null) {
Account account = mapper.readValue(json, Account.class);
account.setNumber(number);
return Optional.of(account);
}
return Optional.empty();
} catch (IOException e) {
logger.warn("AccountsManager", "Deserialization error", e);
if (uuid != null) return redisGet(UUID.fromString(uuid));
else return Optional.empty();
} catch (IllegalArgumentException e) {
logger.warn("Deserialization error", e);
return Optional.empty();
} catch (JedisException e) {
logger.warn("Redis failure", e);
@@ -148,10 +171,38 @@ public class AccountsManager {
}
}
private Optional<Account> redisGet(UUID uuid) {
try (Jedis jedis = cacheClient.getReadResource();
Timer.Context timer = redisUuidGetTimer.time())
{
String json = jedis.get(getAccountEntityKey(uuid));
if (json != null) {
Account account = mapper.readValue(json, Account.class);
account.setUuid(uuid);
return Optional.of(account);
}
return Optional.empty();
} catch (IOException e) {
logger.warn("Deserialization error", e);
return Optional.empty();
} catch (JedisException e) {
logger.warn("Redis failure", e);
return Optional.empty();
}
}
private Optional<Account> databaseGet(String number) {
return accounts.get(number);
}
private Optional<Account> databaseGet(UUID uuid) {
return accounts.get(uuid);
}
private boolean databaseCreate(Account account) {
return accounts.create(account);
}

View File

@@ -1,4 +1,4 @@
/**
/*
* Copyright (C) 2018 Open WhisperSystems
*
* This program is free software: you can redistribute it and/or modify
@@ -20,22 +20,22 @@ import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dropwizard.metrics.MetricsFactory;
import io.dropwizard.metrics.ReporterFactory;
import org.whispersystems.textsecuregcm.entities.ActiveUserTally;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.textsecuregcm.util.Util;
import redis.clients.jedis.Jedis;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import io.dropwizard.metrics.MetricsFactory;
import io.dropwizard.metrics.ReporterFactory;
import redis.clients.jedis.Jedis;
public class ActiveUserCounter implements AccountDatabaseCrawlerListener {
@@ -44,8 +44,6 @@ public class ActiveUserCounter implements AccountDatabaseCrawlerListener {
private static final String PLATFORM_IOS = "ios";
private static final String PLATFORM_ANDROID = "android";
private static final String FIRST_FROM_NUMBER = "+";
private static final String INTERVALS[] = {"daily", "weekly", "monthly", "quarterly", "yearly"};
private final MetricsFactory metricsFactory;
@@ -64,7 +62,8 @@ public class ActiveUserCounter implements AccountDatabaseCrawlerListener {
}
}
public void onCrawlChunk(Optional<String> fromNumber, List<Account> chunkAccounts) {
@Override
public void onCrawlChunk(Optional<UUID> fromNumber, List<Account> chunkAccounts) {
long nowDays = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis());
long agoMs[] = {TimeUnit.DAYS.toMillis(nowDays - 1),
TimeUnit.DAYS.toMillis(nowDays - 7),
@@ -107,47 +106,40 @@ public class ActiveUserCounter implements AccountDatabaseCrawlerListener {
}
}
incrementTallies(fromNumber.orElse(FIRST_FROM_NUMBER), platformIncrements, countryIncrements);
incrementTallies(fromNumber.orElse(UUID.randomUUID()), platformIncrements, countryIncrements);
}
public void onCrawlEnd(Optional<String> fromNumber) {
MetricRegistry metrics = new MetricRegistry();
long intervalTallies[] = new long[INTERVALS.length];
ActiveUserTally activeUserTally = getFinalTallies();
Map<String, long[]> platforms = activeUserTally.getPlatforms();
@Override
public void onCrawlEnd(Optional<UUID> fromNumber) {
MetricRegistry metrics = new MetricRegistry();
long intervalTallies[] = new long[INTERVALS.length];
ActiveUserTally activeUserTally = getFinalTallies();
Map<String, long[]> platforms = activeUserTally.getPlatforms();
platforms.forEach((platform, platformTallies) -> {
for (int i = 0; i < INTERVALS.length; i++) {
final long tally = platformTallies[i];
metrics.register(metricKey(platform, INTERVALS[i]),
new Gauge<Long>() {
@Override
public Long getValue() { return tally; }
});
intervalTallies[i] += tally;
}
});
for (int i = 0; i < INTERVALS.length; i++) {
final long tally = platformTallies[i];
metrics.register(metricKey(platform, INTERVALS[i]),
(Gauge<Long>) () -> tally);
intervalTallies[i] += tally;
}
});
Map<String, long[]> countries = activeUserTally.getCountries();
countries.forEach((country, countryTallies) -> {
for (int i = 0; i < INTERVALS.length; i++) {
final long tally = countryTallies[i];
metrics.register(metricKey(country, INTERVALS[i]),
new Gauge<Long>() {
@Override
public Long getValue() { return tally; }
});
}
});
for (int i = 0; i < INTERVALS.length; i++) {
final long tally = countryTallies[i];
metrics.register(metricKey(country, INTERVALS[i]),
(Gauge<Long>) () -> tally);
}
});
for (int i = 0; i < INTERVALS.length; i++) {
final long intervalTotal = intervalTallies[i];
metrics.register(metricKey(INTERVALS[i]),
new Gauge<Long>() {
@Override
public Long getValue() { return intervalTotal; }
});
(Gauge<Long>) () -> intervalTotal);
}
for (ReporterFactory reporterFactory : metricsFactory.getReporters()) {
reporterFactory.build(metrics).report();
}
@@ -162,22 +154,25 @@ public class ActiveUserCounter implements AccountDatabaseCrawlerListener {
return tally;
}
private void incrementTallies(String fromNumber, Map<String, long[]> platformIncrements, Map<String, long[]> countryIncrements) {
private void incrementTallies(UUID fromUuid, Map<String, long[]> platformIncrements, Map<String, long[]> countryIncrements) {
try (Jedis jedis = jedisPool.getWriteResource()) {
String tallyValue = jedis.get(TALLY_KEY);
ActiveUserTally activeUserTally;
if (tallyValue == null) {
activeUserTally = new ActiveUserTally(fromNumber, platformIncrements, countryIncrements);
activeUserTally = new ActiveUserTally(fromUuid, platformIncrements, countryIncrements);
} else {
activeUserTally = mapper.readValue(tallyValue, ActiveUserTally.class);
if (activeUserTally.getFromNumber() != fromNumber) {
activeUserTally.setFromNumber(fromNumber);
if (!fromUuid.equals(activeUserTally.getFromUuid())) {
activeUserTally.setFromUuid(fromUuid);
Map<String, long[]> platformTallies = activeUserTally.getPlatforms();
addTallyMaps(platformTallies, platformIncrements);
Map<String, long[]> countryTallies = activeUserTally.getCountries();
addTallyMaps(countryTallies, countryIncrements);
}
}
jedis.set(TALLY_KEY, mapper.writeValueAsString(activeUserTally));
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(e);
@@ -188,15 +183,15 @@ public class ActiveUserCounter implements AccountDatabaseCrawlerListener {
private void addTallyMaps(Map<String, long[]> tallyMap, Map<String, long[]> incrementMap) {
incrementMap.forEach((key, increments) -> {
long[] tallies = tallyMap.get(key);
if (tallies == null) {
tallyMap.put(key, increments);
} else {
for (int i = 0; i < INTERVALS.length; i++) {
tallies[i] += increments[i];
}
long[] tallies = tallyMap.get(key);
if (tallies == null) {
tallyMap.put(key, increments);
} else {
for (int i = 0; i < INTERVALS.length; i++) {
tallies[i] += increments[i];
}
});
}
});
}
private ActiveUserTally getFinalTallies() {

View File

@@ -33,6 +33,7 @@ import javax.ws.rs.ProcessingException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import static com.codahale.metrics.MetricRegistry.name;
@@ -55,18 +56,16 @@ public class DirectoryReconciler implements AccountDatabaseCrawlerListener {
public void onCrawlStart() { }
public void onCrawlEnd(Optional<String> fromNumber) {
DirectoryReconciliationRequest request = new DirectoryReconciliationRequest(fromNumber.orElse(null), null, Collections.emptyList());
public void onCrawlEnd(Optional<UUID> fromUuid) {
DirectoryReconciliationRequest request = new DirectoryReconciliationRequest(fromUuid.orElse(null), null, Collections.emptyList());
DirectoryReconciliationResponse response = sendChunk(request);
}
public void onCrawlChunk(Optional<String> fromNumber, List<Account> chunkAccounts) throws AccountDatabaseCrawlerRestartException {
public void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) throws AccountDatabaseCrawlerRestartException {
updateDirectoryCache(chunkAccounts);
DirectoryReconciliationRequest request = createChunkRequest(fromNumber, chunkAccounts);
DirectoryReconciliationRequest request = createChunkRequest(fromUuid, chunkAccounts);
DirectoryReconciliationResponse response = sendChunk(request);
if (response.getStatus() == DirectoryReconciliationResponse.Status.MISSING) {
throw new AccountDatabaseCrawlerRestartException("directory reconciler missing");
@@ -93,19 +92,19 @@ public class DirectoryReconciler implements AccountDatabaseCrawlerListener {
}
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private DirectoryReconciliationRequest createChunkRequest(Optional<String> fromNumber, List<Account> accounts) {
private DirectoryReconciliationRequest createChunkRequest(Optional<UUID> fromUuid, List<Account> accounts) {
List<String> numbers = accounts.stream()
.filter(Account::isEnabled)
.map(Account::getNumber)
.collect(Collectors.toList());
Optional<String> toNumber = Optional.empty();
Optional<UUID> toUuid = Optional.empty();
if (!accounts.isEmpty()) {
toNumber = Optional.of(accounts.get(accounts.size() - 1).getNumber());
toUuid = Optional.of(accounts.get(accounts.size() - 1).getUuid());
}
return new DirectoryReconciliationRequest(fromNumber.orElse(null), toNumber.orElse(null), numbers);
return new DirectoryReconciliationRequest(fromUuid.orElse(null), toUuid.orElse(null), numbers);
}
private DirectoryReconciliationResponse sendChunk(DirectoryReconciliationRequest request) {

View File

@@ -25,6 +25,7 @@ public class Messages {
public static final String TIMESTAMP = "timestamp";
public static final String SERVER_TIMESTAMP = "server_timestamp";
public static final String SOURCE = "source";
public static final String SOURCE_UUID = "source_uuid";
public static final String SOURCE_DEVICE = "source_device";
public static final String DESTINATION = "destination";
public static final String DESTINATION_DEVICE = "destination_device";
@@ -51,8 +52,8 @@ public class Messages {
public void store(UUID guid, Envelope message, String destination, long destinationDevice) {
database.use(jdbi ->jdbi.useHandle(handle -> {
try (Timer.Context ignored = storeTimer.time()) {
handle.createUpdate("INSERT INTO messages (" + GUID + ", " + TYPE + ", " + RELAY + ", " + TIMESTAMP + ", " + SERVER_TIMESTAMP + ", " + SOURCE + ", " + SOURCE_DEVICE + ", " + DESTINATION + ", " + DESTINATION_DEVICE + ", " + MESSAGE + ", " + CONTENT + ") " +
"VALUES (:guid, :type, :relay, :timestamp, :server_timestamp, :source, :source_device, :destination, :destination_device, :message, :content)")
handle.createUpdate("INSERT INTO messages (" + GUID + ", " + TYPE + ", " + RELAY + ", " + TIMESTAMP + ", " + SERVER_TIMESTAMP + ", " + SOURCE + ", " + SOURCE_UUID + ", " + SOURCE_DEVICE + ", " + DESTINATION + ", " + DESTINATION_DEVICE + ", " + MESSAGE + ", " + CONTENT + ") " +
"VALUES (:guid, :type, :relay, :timestamp, :server_timestamp, :source, :source_uuid, :source_device, :destination, :destination_device, :message, :content)")
.bind("guid", guid)
.bind("destination", destination)
.bind("destination_device", destinationDevice)
@@ -61,6 +62,7 @@ public class Messages {
.bind("timestamp", message.getTimestamp())
.bind("server_timestamp", message.getServerTimestamp())
.bind("source", message.hasSource() ? message.getSource() : null)
.bind("source_uuid", message.hasSourceUuid() ? UUID.fromString(message.getSourceUuid()) : null)
.bind("source_device", message.hasSourceDevice() ? message.getSourceDevice() : null)
.bind("message", message.hasLegacyMessage() ? message.getLegacyMessage().toByteArray() : null)
.bind("content", message.hasContent() ? message.getContent().toByteArray() : null)

View File

@@ -203,6 +203,7 @@ public class MessagesCache implements Managed {
envelope.getRelay(),
envelope.getTimestamp(),
envelope.getSource(),
envelope.hasSourceUuid() ? UUID.fromString(envelope.getSourceUuid()) : null,
envelope.getSourceDevice(),
envelope.hasLegacyMessage() ? envelope.getLegacyMessage().toByteArray() : null,
envelope.hasContent() ? envelope.getContent().toByteArray() : null,

View File

@@ -1,18 +0,0 @@
package org.whispersystems.textsecuregcm.storage;
public class PublicAccount extends Account {
public PublicAccount() {}
public PublicAccount(Account account) {
setIdentityKey(account.getIdentityKey());
setUnidentifiedAccessKey(account.getUnidentifiedAccessKey().orElse(null));
setUnrestrictedUnidentifiedAccess(account.isUnrestrictedUnidentifiedAccess());
setAvatar(account.getAvatar());
setProfileName(account.getProfileName());
setPin("******");
account.getDevices().forEach(this::addDevice);
}
}

View File

@@ -9,6 +9,7 @@ import org.whispersystems.textsecuregcm.util.Util;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static com.codahale.metrics.MetricRegistry.name;
@@ -31,7 +32,7 @@ public class PushFeedbackProcessor implements AccountDatabaseCrawlerListener {
public void onCrawlStart() {}
@Override
public void onCrawlChunk(Optional<String> fromNumber, List<Account> chunkAccounts) {
public void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) {
for (Account account : chunkAccounts) {
boolean update = false;
@@ -65,5 +66,5 @@ public class PushFeedbackProcessor implements AccountDatabaseCrawlerListener {
}
@Override
public void onCrawlEnd(Optional<String> fromNumber) {}
public void onCrawlEnd(Optional<UUID> toUuid) {}
}

View File

@@ -10,6 +10,7 @@ import org.whispersystems.textsecuregcm.util.SystemMapper;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.UUID;
public class AccountRowMapper implements RowMapper<Account> {
@@ -20,6 +21,7 @@ public class AccountRowMapper implements RowMapper<Account> {
try {
Account account = mapper.readValue(resultSet.getString(Accounts.DATA), Account.class);
account.setNumber(resultSet.getString(Accounts.NUMBER));
account.setUuid(UUID.fromString(resultSet.getString(Accounts.UID)));
return account;
} catch (IOException e) {
throw new SQLException(e);

View File

@@ -16,6 +16,7 @@ public class OutgoingMessageEntityRowMapper implements RowMapper<OutgoingMessage
int type = resultSet.getInt(Messages.TYPE);
byte[] legacyMessage = resultSet.getBytes(Messages.MESSAGE);
String guid = resultSet.getString(Messages.GUID);
String sourceUuid = resultSet.getString(Messages.SOURCE_UUID);
if (type == Envelope.Type.RECEIPT_VALUE && legacyMessage == null) {
/// XXX - REMOVE AFTER 10/01/15
@@ -29,6 +30,7 @@ public class OutgoingMessageEntityRowMapper implements RowMapper<OutgoingMessage
resultSet.getString(Messages.RELAY),
resultSet.getLong(Messages.TIMESTAMP),
resultSet.getString(Messages.SOURCE),
sourceUuid == null ? null : UUID.fromString(sourceUuid),
resultSet.getInt(Messages.SOURCE_DEVICE),
legacyMessage,
resultSet.getBytes(Messages.CONTENT),