Decommission the old directory cache.

This commit is contained in:
Jon Chambers
2021-01-11 16:27:04 -05:00
committed by Jon Chambers
parent 9cd121c8f6
commit 71510a8199
20 changed files with 34 additions and 831 deletions

View File

@@ -93,7 +93,6 @@ import org.whispersystems.textsecuregcm.metrics.PushLatencyManager;
import org.whispersystems.textsecuregcm.metrics.TrafficSource;
import org.whispersystems.textsecuregcm.providers.RedisClientFactory;
import org.whispersystems.textsecuregcm.providers.RedisClusterHealthCheck;
import org.whispersystems.textsecuregcm.providers.RedisHealthCheck;
import org.whispersystems.textsecuregcm.push.APNSender;
import org.whispersystems.textsecuregcm.push.ApnFallbackManager;
import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
@@ -119,7 +118,6 @@ import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerListener;
import org.whispersystems.textsecuregcm.storage.Accounts;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.ActiveUserCounter;
import org.whispersystems.textsecuregcm.storage.DirectoryManager;
import org.whispersystems.textsecuregcm.storage.DirectoryReconciler;
import org.whispersystems.textsecuregcm.storage.DirectoryReconciliationClient;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
@@ -301,11 +299,9 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
FeatureFlags featureFlags = new FeatureFlags(accountDatabase);
RedisClientFactory pubSubClientFactory = new RedisClientFactory("pubsub_cache", config.getPubsubCacheConfiguration().getUrl(), config.getPubsubCacheConfiguration().getReplicaUrls(), config.getPubsubCacheConfiguration().getCircuitBreakerConfiguration());
RedisClientFactory directoryClientFactory = new RedisClientFactory("directory_cache", config.getDirectoryConfiguration().getRedisConfiguration().getUrl(), config.getDirectoryConfiguration().getRedisConfiguration().getReplicaUrls(), config.getDirectoryConfiguration().getRedisConfiguration().getCircuitBreakerConfiguration());
RedisClientFactory pushSchedulerClientFactory = new RedisClientFactory("push_scheduler_cache", config.getPushScheduler().getUrl(), config.getPushScheduler().getReplicaUrls(), config.getPushScheduler().getCircuitBreakerConfiguration());
ReplicatedJedisPool pubsubClient = pubSubClientFactory.getRedisClientPool();
ReplicatedJedisPool directoryClient = directoryClientFactory.getRedisClientPool();
ReplicatedJedisPool pushSchedulerClient = pushSchedulerClientFactory.getRedisClientPool();
ClientResources generalCacheClientResources = ClientResources.builder().build();
@@ -339,7 +335,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
ExperimentEnrollmentManager experimentEnrollmentManager = new ExperimentEnrollmentManager(dynamicConfigurationManager);
ClientPresenceManager clientPresenceManager = new ClientPresenceManager(clientPresenceCluster, recurringJobExecutor, keyspaceNotificationDispatchExecutor);
DirectoryManager directory = new DirectoryManager(directoryClient);
DirectoryQueue directoryQueue = new DirectoryQueue(config.getDirectoryConfiguration().getSqsConfiguration());
PendingAccountsManager pendingAccountsManager = new PendingAccountsManager(pendingAccounts, cacheCluster);
PendingDevicesManager pendingDevicesManager = new PendingDevicesManager(pendingDevices, cacheCluster);
@@ -348,7 +343,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
MessagesCache messagesCache = new MessagesCache(messagesCluster, messagesCluster, keyspaceNotificationDispatchExecutor);
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster);
MessagesManager messagesManager = new MessagesManager(messages, messagesDynamoDb, messagesCache, pushLatencyManager, experimentEnrollmentManager);
AccountsManager accountsManager = new AccountsManager(accounts, directory, cacheCluster, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager);
AccountsManager accountsManager = new AccountsManager(accounts, cacheCluster, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager);
RemoteConfigsManager remoteConfigsManager = new RemoteConfigsManager(remoteConfigs);
FeatureFlagsManager featureFlagsManager = new FeatureFlagsManager(featureFlags, recurringJobExecutor);
DeadLetterHandler deadLetterHandler = new DeadLetterHandler(accountsManager, messagesManager);
@@ -385,7 +380,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
accountDatabaseCrawlerListeners.add(new ActiveUserCounter(config.getMetricsFactory(), cacheCluster));
for (DirectoryServerConfiguration directoryServerConfiguration : config.getDirectoryConfiguration().getDirectoryServerConfiguration()) {
final DirectoryReconciliationClient directoryReconciliationClient = new DirectoryReconciliationClient(directoryServerConfiguration);
final DirectoryReconciler directoryReconciler = new DirectoryReconciler(directoryServerConfiguration.getReplicationName(), directoryServerConfiguration.isReplicationPrimary(), directoryReconciliationClient, directory);
final DirectoryReconciler directoryReconciler = new DirectoryReconciler(directoryServerConfiguration.getReplicationName(), directoryReconciliationClient);
accountDatabaseCrawlerListeners.add(directoryReconciler);
}
accountDatabaseCrawlerListeners.add(new AccountCleaner(accountsManager));
@@ -441,7 +436,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
environment.jersey().register(new AccountController(pendingAccountsManager, accountsManager, usernamesManager, abusiveHostRules, rateLimiters, smsSender, directoryQueue, messagesManager, turnTokenGenerator, config.getTestDevices(), recaptchaClient, gcmSender, apnSender, backupCredentialsGenerator));
environment.jersey().register(new DeviceController(pendingDevicesManager, accountsManager, messagesManager, directoryQueue, rateLimiters, config.getMaxDevices()));
environment.jersey().register(new DirectoryController(rateLimiters, directory, directoryCredentialsGenerator));
environment.jersey().register(new DirectoryController(directoryCredentialsGenerator));
environment.jersey().register(new ProvisioningController(rateLimiters, provisioningManager));
environment.jersey().register(new CertificateController(new CertificateGenerator(config.getDeliveryCertificate().getCertificate(), config.getDeliveryCertificate().getPrivateKey(), config.getDeliveryCertificate().getExpiresDays()), zkAuthOperations, isZkEnabled));
environment.jersey().register(new VoiceVerificationController(config.getVoiceVerificationConfiguration().getUrl(), config.getVoiceVerificationConfiguration().getLocales()));
@@ -498,7 +493,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
///
environment.healthChecks().register("directory", new RedisHealthCheck(directoryClient));
environment.healthChecks().register("cacheCluster", new RedisClusterHealthCheck(cacheCluster));
environment.metrics().register(name(CpuUsageGauge.class, "cpu"), new CpuUsageGauge(3, TimeUnit.SECONDS));

View File

@@ -12,11 +12,6 @@ import java.util.List;
public class DirectoryConfiguration {
@JsonProperty
@NotNull
@Valid
private RedisConfiguration redis;
@JsonProperty
@NotNull
@Valid
@@ -32,10 +27,6 @@ public class DirectoryConfiguration {
@Valid
private List<DirectoryServerConfiguration> server;
public RedisConfiguration getRedisConfiguration() {
return redis;
}
public SqsConfiguration getSqsConfiguration() {
return sqs;
}

View File

@@ -13,9 +13,6 @@ public class DirectoryServerConfiguration {
@JsonProperty
private String replicationName;
@JsonProperty
private boolean replicationPrimary;
@NotEmpty
@JsonProperty
private String replicationUrl;
@@ -32,10 +29,6 @@ public class DirectoryServerConfiguration {
return replicationName;
}
public boolean isReplicationPrimary() {
return replicationPrimary;
}
public String getReplicationUrl() {
return replicationUrl;
}

View File

@@ -35,12 +35,6 @@ public class RateLimitsConfiguration {
@JsonProperty
private RateLimitConfiguration attachments = new RateLimitConfiguration(50, 50);
@JsonProperty
private RateLimitConfiguration contactQueries = new RateLimitConfiguration(50000, 50000);
@JsonProperty
private RateLimitConfiguration contactIpQueries = new RateLimitConfiguration(200, (100.0 / 60.0));
@JsonProperty
private RateLimitConfiguration prekeys = new RateLimitConfiguration(3, 1.0 / 10.0);
@@ -88,14 +82,6 @@ public class RateLimitsConfiguration {
return prekeys;
}
public RateLimitConfiguration getContactQueries() {
return contactQueries;
}
public RateLimitConfiguration getContactIpQueries() {
return contactIpQueries;
}
public RateLimitConfiguration getAttachments() {
return attachments;
}

View File

@@ -4,92 +4,28 @@
*/
package org.whispersystems.textsecuregcm.controllers;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.annotation.Timed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.dropwizard.auth.Auth;
import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentialGenerator;
import org.whispersystems.textsecuregcm.entities.ClientContact;
import org.whispersystems.textsecuregcm.entities.ClientContactTokens;
import org.whispersystems.textsecuregcm.entities.ClientContacts;
import org.whispersystems.textsecuregcm.entities.DirectoryFeedbackRequest;
import org.whispersystems.textsecuregcm.limits.RateLimiters;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.DirectoryManager;
import org.whispersystems.textsecuregcm.util.Base64;
import org.whispersystems.textsecuregcm.util.Constants;
import javax.validation.Valid;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;
import static com.codahale.metrics.MetricRegistry.name;
import io.dropwizard.auth.Auth;
@Path("/v1/directory")
public class DirectoryController {
private static final String[] FEEDBACK_STATUSES = {
"ok",
"mismatch",
"attestation-error",
"unexpected-error",
};
private static final String[] FRONTED_REGIONS = {"+20", "+971", "+968", "+974"};
private final Logger logger = LoggerFactory.getLogger(DirectoryController.class);
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private final Histogram contactsHistogram = metricRegistry.histogram(name(getClass(), "contacts"));
private final Meter contactsMeter = metricRegistry.meter(name(getClass(), "contactRate"));
private final Map<String, Meter> iosFeedbackMeters = new HashMap<String, Meter>() {{
for (String status : FEEDBACK_STATUSES) {
put(status, metricRegistry.meter(name(DirectoryController.class, "feedback-v2", "ios", status)));
}
}};
private final Map<String, Meter> androidFeedbackMeters = new HashMap<String, Meter>() {{
for (String status : FEEDBACK_STATUSES) {
put(status, metricRegistry.meter(name(DirectoryController.class, "feedback-v2", "android", status)));
}
}};
private final Map<String, Meter> unknownFeedbackMeters = new HashMap<String, Meter>() {{
for (String status : FEEDBACK_STATUSES) {
put(status, metricRegistry.meter(name(DirectoryController.class, "feedback-v2", "unknown", status)));
}
}};
private final RateLimiters rateLimiters;
private final DirectoryManager directory;
private final ExternalServiceCredentialGenerator directoryServiceTokenGenerator;
public DirectoryController(RateLimiters rateLimiters,
DirectoryManager directory,
ExternalServiceCredentialGenerator userTokenGenerator)
{
this.directory = directory;
this.rateLimiters = rateLimiters;
public DirectoryController(ExternalServiceCredentialGenerator userTokenGenerator) {
this.directoryServiceTokenGenerator = userTokenGenerator;
}
@@ -105,29 +41,8 @@ public class DirectoryController {
@Path("/feedback-v3/{status}")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response setFeedback(@Auth Account account,
@PathParam("status") String status,
@Valid DirectoryFeedbackRequest request)
{
Map<String, Meter> platformFeedbackMeters = unknownFeedbackMeters;
Optional<Device> masterDevice = account.getMasterDevice();
if (masterDevice.isPresent()) {
if (masterDevice.get().getApnId() != null) {
platformFeedbackMeters = iosFeedbackMeters;
} else if (masterDevice.get().getGcmId() != null) {
platformFeedbackMeters = androidFeedbackMeters;
}
}
Optional<Meter> meter = Optional.ofNullable(platformFeedbackMeters.get(status));
if (meter.isPresent()) {
meter.get().mark();
return Response.ok().build();
} else {
return Response.status(Status.NOT_FOUND).build();
}
public Response setFeedback(@Auth Account account) {
return Response.ok().build();
}
@@ -135,9 +50,7 @@ public class DirectoryController {
@GET
@Path("/{token}")
@Produces(MediaType.APPLICATION_JSON)
public Response getTokenPresence(@Auth Account account, @PathParam("token") String token)
throws RateLimitExceededException
{
public Response getTokenPresence(@Auth Account account) {
return Response.status(429).build();
}
@@ -146,15 +59,7 @@ public class DirectoryController {
@Path("/tokens")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public Response getContactIntersection(@Auth Account account,
@HeaderParam("X-Forwarded-For") String forwardedFor,
@Valid ClientContactTokens contacts)
throws RateLimitExceededException
{
public Response getContactIntersection(@Auth Account account) {
return Response.status(429).build();
}
private byte[] decodeToken(String encoded) throws IOException {
return Base64.decodeWithoutPadding(encoded.replace('-', '+').replace('_', '/'));
}
}

View File

@@ -1,96 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.entities;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.whispersystems.textsecuregcm.util.ByteArrayAdapter;
import java.util.Arrays;
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public class ClientContact {
@JsonSerialize(using = ByteArrayAdapter.Serializing.class)
@JsonDeserialize(using = ByteArrayAdapter.Deserializing.class)
@JsonProperty
private byte[] token;
@JsonProperty
private boolean voice;
@JsonProperty
private boolean video;
private String relay;
private boolean inactive;
public ClientContact(byte[] token, String relay, boolean voice, boolean video) {
this.token = token;
this.relay = relay;
this.voice = voice;
this.video = video;
}
public ClientContact() {}
public byte[] getToken() {
return token;
}
public String getRelay() {
return relay;
}
public void setRelay(String relay) {
this.relay = relay;
}
public boolean isInactive() {
return inactive;
}
public void setInactive(boolean inactive) {
this.inactive = inactive;
}
public boolean isVoice() {
return voice;
}
public void setVoice(boolean voice) {
this.voice = voice;
}
public boolean isVideo() {
return video;
}
public void setVideo(boolean video) {
this.video = video;
}
@Override
public boolean equals(Object other) {
if (other == null) return false;
if (!(other instanceof ClientContact)) return false;
ClientContact that = (ClientContact)other;
return
Arrays.equals(this.token, that.token) &&
this.inactive == that.inactive &&
this.voice == that.voice &&
this.video == that.video &&
(this.relay == null ? (that.relay == null) : this.relay.equals(that.relay));
}
public int hashCode() {
return Arrays.hashCode(this.token);
}
}

View File

@@ -1,28 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.entities;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.validation.constraints.NotNull;
import java.util.List;
public class ClientContactTokens {
@NotNull
@JsonProperty
private List<String> contacts;
public List<String> getContacts() {
return contacts;
}
public ClientContactTokens() {}
public ClientContactTokens(List<String> contacts) {
this.contacts = contacts;
}
}

View File

@@ -1,29 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.entities;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.LinkedList;
import java.util.List;
public class ClientContacts {
@JsonProperty
private List<ClientContact> contacts;
public ClientContacts(List<ClientContact> contacts) {
if (contacts != null) this.contacts = contacts;
else this.contacts = new LinkedList<>();
}
public ClientContacts() {
this.contacts = new LinkedList<>();
}
public List<ClientContact> getContacts() {
return contacts;
}
}

View File

@@ -1,31 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.entities;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.validation.constraints.Size;
import javax.validation.valueextraction.Unwrapping;
import java.util.Optional;
public class DirectoryFeedbackRequest {
@Size(max = 1024, payload = {Unwrapping.Unwrap.class})
@JsonProperty
private Optional<String> reason;
public DirectoryFeedbackRequest() {
}
public DirectoryFeedbackRequest(Optional<String> reason) {
this.reason = reason;
}
public Optional<String> getReason() {
return reason;
}
}

View File

@@ -25,8 +25,6 @@ public class RateLimiters {
private final RateLimiter pinLimiter;
private final RateLimiter attachmentLimiter;
private final RateLimiter contactsLimiter;
private final RateLimiter contactsIpLimiter;
private final RateLimiter preKeysLimiter;
private final RateLimiter messagesLimiter;
@@ -86,14 +84,6 @@ public class RateLimiters {
config.getAttachments().getBucketSize(),
config.getAttachments().getLeakRatePerMinute());
this.contactsLimiter = new RateLimiter(cacheCluster, "contactsQuery",
config.getContactQueries().getBucketSize(),
config.getContactQueries().getLeakRatePerMinute());
this.contactsIpLimiter = new RateLimiter(cacheCluster, "contactsIpQuery",
config.getContactIpQueries().getBucketSize(),
config.getContactIpQueries().getLeakRatePerMinute());
this.preKeysLimiter = new RateLimiter(cacheCluster, "prekeys",
config.getPreKeys().getBucketSize(),
config.getPreKeys().getLeakRatePerMinute());
@@ -174,14 +164,6 @@ public class RateLimiters {
return preKeysLimiter;
}
public RateLimiter getContactsLimiter() {
return contactsLimiter;
}
public RateLimiter getContactsIpLimiter() {
return contactsIpLimiter;
}
public RateLimiter getAttachmentLimiter() {
return this.attachmentLimiter;
}

View File

@@ -16,7 +16,6 @@ import io.micrometer.core.instrument.Metrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.AmbiguousIdentifier;
import org.whispersystems.textsecuregcm.entities.ClientContact;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.sqs.DirectoryQueue;
import org.whispersystems.textsecuregcm.util.Constants;
@@ -53,7 +52,6 @@ public class AccountsManager {
private final Accounts accounts;
private final FaultTolerantRedisCluster cacheCluster;
private final DirectoryManager directory;
private final DirectoryQueue directoryQueue;
private final KeysDynamoDb keysDynamoDb;
private final MessagesManager messagesManager;
@@ -73,9 +71,8 @@ public class AccountsManager {
}
}
public AccountsManager(Accounts accounts, DirectoryManager directory, FaultTolerantRedisCluster cacheCluster, final DirectoryQueue directoryQueue, final KeysDynamoDb keysDynamoDb, final MessagesManager messagesManager, final UsernamesManager usernamesManager, final ProfilesManager profilesManager) {
public AccountsManager(Accounts accounts, FaultTolerantRedisCluster cacheCluster, final DirectoryQueue directoryQueue, final KeysDynamoDb keysDynamoDb, final MessagesManager messagesManager, final UsernamesManager usernamesManager, final ProfilesManager profilesManager) {
this.accounts = accounts;
this.directory = directory;
this.cacheCluster = cacheCluster;
this.directoryQueue = directoryQueue;
this.keysDynamoDb = keysDynamoDb;
@@ -89,7 +86,6 @@ public class AccountsManager {
try (Timer.Context ignored = createTimer.time()) {
boolean freshUser = databaseCreate(account);
redisSet(account);
updateDirectory(account);
return freshUser;
}
@@ -99,7 +95,6 @@ public class AccountsManager {
try (Timer.Context ignored = updateTimer.time()) {
redisSet(account);
databaseUpdate(account);
updateDirectory(account);
}
}
@@ -148,7 +143,6 @@ public class AccountsManager {
try (final Timer.Context ignored = deleteTimer.time()) {
usernamesManager.delete(account.getUuid());
directoryQueue.deleteAccount(account);
directory.remove(account.getNumber());
profilesManager.deleteAll(account.getUuid());
keysDynamoDb.delete(account);
messagesManager.clear(account.getNumber(), account.getUuid());
@@ -162,16 +156,6 @@ public class AccountsManager {
.increment();
}
private void updateDirectory(Account account) {
if (account.isEnabled()) {
byte[] token = Util.getContactToken(account.getNumber());
ClientContact clientContact = new ClientContact(token, null, true, true);
directory.add(clientContact);
} else {
directory.remove(account.getNumber());
}
}
private String getAccountMapKey(String number) {
return "AccountMap::" + number;
}

View File

@@ -1,204 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.ClientContact;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.IterablePair;
import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.textsecuregcm.util.Util;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;
public class DirectoryManager {
private final Logger logger = LoggerFactory.getLogger(DirectoryManager.class);
private static final byte[] DIRECTORY_KEY = {'d', 'i', 'r', 'e', 'c', 't', 'o', 'r', 'y'};
private final ObjectMapper objectMapper;
private final ReplicatedJedisPool redisPool;
public DirectoryManager(ReplicatedJedisPool redisPool) {
this.redisPool = redisPool;
this.objectMapper = new ObjectMapper();
this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}
public void remove(String number) {
remove(Util.getContactToken(number));
}
public void remove(BatchOperationHandle handle, String number) {
remove(handle, Util.getContactToken(number));
}
public void remove(byte[] token) {
try (Jedis jedis = redisPool.getWriteResource()) {
jedis.hdel(DIRECTORY_KEY, token);
}
}
public void remove(BatchOperationHandle handle, byte[] token) {
Pipeline pipeline = handle.pipeline;
pipeline.hdel(DIRECTORY_KEY, token);
}
public void add(ClientContact contact) {
TokenValue tokenValue = new TokenValue(contact.getRelay(), contact.isVoice(), contact.isVideo());
try (Jedis jedis = redisPool.getWriteResource()) {
jedis.hset(DIRECTORY_KEY, contact.getToken(), objectMapper.writeValueAsBytes(tokenValue));
} catch (JsonProcessingException e) {
logger.warn("JSON Serialization", e);
}
}
public void add(BatchOperationHandle handle, ClientContact contact) {
try {
Pipeline pipeline = handle.pipeline;
TokenValue tokenValue = new TokenValue(contact.getRelay(), contact.isVoice(), contact.isVideo());
pipeline.hset(DIRECTORY_KEY, contact.getToken(), objectMapper.writeValueAsBytes(tokenValue));
} catch (JsonProcessingException e) {
logger.warn("JSON Serialization", e);
}
}
public PendingClientContact get(BatchOperationHandle handle, byte[] token) {
Pipeline pipeline = handle.pipeline;
return new PendingClientContact(objectMapper, token, pipeline.hget(DIRECTORY_KEY, token));
}
public Optional<ClientContact> get(byte[] token) {
try (Jedis jedis = redisPool.getWriteResource()) {
byte[] result = jedis.hget(DIRECTORY_KEY, token);
if (result == null) {
return Optional.empty();
}
TokenValue tokenValue = objectMapper.readValue(result, TokenValue.class);
return Optional.of(new ClientContact(token, tokenValue.relay, tokenValue.voice, tokenValue.video));
} catch (IOException e) {
logger.warn("JSON Error", e);
return Optional.empty();
}
}
public List<ClientContact> get(List<byte[]> tokens) {
try (Jedis jedis = redisPool.getWriteResource()) {
Pipeline pipeline = jedis.pipelined();
List<Response<byte[]>> futures = new LinkedList<>();
List<ClientContact> results = new LinkedList<>();
try {
for (byte[] token : tokens) {
futures.add(pipeline.hget(DIRECTORY_KEY, token));
}
} finally {
pipeline.sync();
}
IterablePair<byte[], Response<byte[]>> lists = new IterablePair<>(tokens, futures);
for (Pair<byte[], Response<byte[]>> pair : lists) {
try {
if (pair.second().get() != null) {
TokenValue tokenValue = objectMapper.readValue(pair.second().get(), TokenValue.class);
ClientContact clientContact = new ClientContact(pair.first(), tokenValue.relay, tokenValue.voice, tokenValue.video);
results.add(clientContact);
}
} catch (IOException e) {
logger.warn("Deserialization Problem: ", e);
}
}
return results;
}
}
public BatchOperationHandle startBatchOperation() {
Jedis jedis = redisPool.getWriteResource();
return new BatchOperationHandle(jedis, jedis.pipelined());
}
public void stopBatchOperation(BatchOperationHandle handle) {
Pipeline pipeline = handle.pipeline;
Jedis jedis = handle.jedis;
pipeline.sync();
jedis.close();
}
public static class BatchOperationHandle {
public final Pipeline pipeline;
public final Jedis jedis;
public BatchOperationHandle(Jedis jedis, Pipeline pipeline) {
this.pipeline = pipeline;
this.jedis = jedis;
}
}
private static class TokenValue {
@JsonProperty(value = "r")
private String relay;
@JsonProperty(value = "v")
private boolean voice;
@JsonProperty(value = "w")
private boolean video;
public TokenValue() {}
public TokenValue(String relay, boolean voice, boolean video) {
this.relay = relay;
this.voice = voice;
this.video = video;
}
}
public static class PendingClientContact {
private final ObjectMapper objectMapper;
private final byte[] token;
private final Response<byte[]> response;
PendingClientContact(ObjectMapper objectMapper, byte[] token, Response<byte[]> response) {
this.objectMapper = objectMapper;
this.token = token;
this.response = response;
}
public Optional<ClientContact> get() throws IOException {
byte[] result = response.get();
if (result == null) {
return Optional.empty();
}
TokenValue tokenValue = objectMapper.readValue(result, TokenValue.class);
return Optional.of(new ClientContact(token, tokenValue.relay, tokenValue.voice, tokenValue.video));
}
}
}

View File

@@ -10,15 +10,11 @@ import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.ClientContact;
import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationRequest;
import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationResponse;
import org.whispersystems.textsecuregcm.storage.DirectoryManager.BatchOperationHandle;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Util;
import javax.ws.rs.ProcessingException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -32,17 +28,11 @@ public class DirectoryReconciler extends AccountDatabaseCrawlerListener {
private static final Logger logger = LoggerFactory.getLogger(DirectoryReconciler.class);
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private final String name;
private final boolean primary;
private final DirectoryManager directoryManager;
private final DirectoryReconciliationClient reconciliationClient;
private final Timer sendChunkTimer;
private final Meter sendChunkErrorMeter;
public DirectoryReconciler(String name, boolean primary, DirectoryReconciliationClient reconciliationClient, DirectoryManager directoryManager) {
this.name = name;
this.primary = primary;
this.directoryManager = directoryManager;
public DirectoryReconciler(String name, DirectoryReconciliationClient reconciliationClient) {
this.reconciliationClient = reconciliationClient;
sendChunkTimer = metricRegistry.timer(name(DirectoryReconciler.class, name, "sendChunk"));
sendChunkErrorMeter = metricRegistry.meter(name(DirectoryReconciler.class, name, "sendChunkError"));
@@ -59,11 +49,6 @@ public class DirectoryReconciler extends AccountDatabaseCrawlerListener {
@Override
protected void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) throws AccountDatabaseCrawlerRestartException {
if (primary) {
updateDirectoryCache(chunkAccounts);
}
DirectoryReconciliationRequest request = createChunkRequest(fromUuid, chunkAccounts);
DirectoryReconciliationResponse response = sendChunk(request);
if (response.getStatus() == DirectoryReconciliationResponse.Status.MISSING) {
@@ -71,25 +56,6 @@ public class DirectoryReconciler extends AccountDatabaseCrawlerListener {
}
}
private void updateDirectoryCache(List<Account> accounts) {
BatchOperationHandle batchOperation = directoryManager.startBatchOperation();
try {
for (Account account : accounts) {
if (account.isEnabled() && account.isDiscoverableByPhoneNumber()) {
byte[] token = Util.getContactToken(account.getNumber());
ClientContact clientContact = new ClientContact(token, null, true, true);
directoryManager.add(batchOperation, clientContact);
} else {
directoryManager.remove(batchOperation, account.getNumber());
}
}
} finally {
directoryManager.stopBatchOperation(batchOperation);
}
}
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private DirectoryReconciliationRequest createChunkRequest(Optional<UUID> fromUuid, List<Account> accounts) {
List<DirectoryReconciliationRequest.User> users = new ArrayList<>(accounts.size());

View File

@@ -23,14 +23,11 @@ import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
import org.whispersystems.textsecuregcm.metrics.PushLatencyManager;
import org.whispersystems.textsecuregcm.providers.RedisClientFactory;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.sqs.DirectoryQueue;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Accounts;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.DirectoryManager;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
import org.whispersystems.textsecuregcm.storage.KeysDynamoDb;
@@ -121,18 +118,16 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
KeysDynamoDb keysDynamoDb = new KeysDynamoDb(preKeysDynamoDb, configuration.getKeysDynamoDbConfiguration().getTableName());
Messages messages = new Messages(messageDatabase);
MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(messageDynamoDb, configuration.getMessageDynamoDbConfiguration().getTableName(), configuration.getMessageDynamoDbConfiguration().getTimeToLive());
ReplicatedJedisPool redisClient = new RedisClientFactory("directory_cache_delete_command", configuration.getDirectoryConfiguration().getRedisConfiguration().getUrl(), configuration.getDirectoryConfiguration().getRedisConfiguration().getReplicaUrls(), configuration.getDirectoryConfiguration().getRedisConfiguration().getCircuitBreakerConfiguration()).getRedisClientPool();
FaultTolerantRedisCluster messageInsertCacheCluster = new FaultTolerantRedisCluster("message_insert_cluster", configuration.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClusterClientResources);
FaultTolerantRedisCluster messageReadDeleteCluster = new FaultTolerantRedisCluster("message_read_delete_cluster", configuration.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClusterClientResources);
FaultTolerantRedisCluster metricsCluster = new FaultTolerantRedisCluster("metrics_cluster", configuration.getMetricsClusterConfiguration(), redisClusterClientResources);
MessagesCache messagesCache = new MessagesCache(messageInsertCacheCluster, messageReadDeleteCluster, keyspaceNotificationDispatchExecutor);
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster);
DirectoryQueue directoryQueue = new DirectoryQueue (configuration.getDirectoryConfiguration().getSqsConfiguration());
DirectoryManager directory = new DirectoryManager(redisClient );
UsernamesManager usernamesManager = new UsernamesManager(usernames, reservedUsernames, cacheCluster);
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
MessagesManager messagesManager = new MessagesManager(messages, messagesDynamoDb, messagesCache, pushLatencyManager, new ExperimentEnrollmentManager(dynamicConfigurationManager));
AccountsManager accountsManager = new AccountsManager(accounts, directory, cacheCluster, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager);
AccountsManager accountsManager = new AccountsManager(accounts, cacheCluster, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager);
for (String user: users) {
Optional<Account> account = accountsManager.get(user);