Read exclusively from the cache cluster.

This commit is contained in:
Jon Chambers
2020-06-16 10:56:17 -04:00
committed by Jon Chambers
parent 2b109db1b1
commit fc1d88f5bb
14 changed files with 315 additions and 386 deletions

View File

@@ -77,7 +77,6 @@ import org.whispersystems.textsecuregcm.controllers.SecureBackupController;
import org.whispersystems.textsecuregcm.controllers.SecureStorageController;
import org.whispersystems.textsecuregcm.controllers.StickerController;
import org.whispersystems.textsecuregcm.controllers.VoiceVerificationController;
import org.whispersystems.textsecuregcm.experiment.Experiment;
import org.whispersystems.textsecuregcm.filters.TimestampResponseFilter;
import org.whispersystems.textsecuregcm.limits.RateLimiters;
import org.whispersystems.textsecuregcm.liquibase.NameableMigrationsBundle;
@@ -291,9 +290,9 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
DirectoryQueue directoryQueue = new DirectoryQueue(config.getDirectoryConfiguration().getSqsConfiguration());
PendingAccountsManager pendingAccountsManager = new PendingAccountsManager(pendingAccounts, cacheClient, cacheCluster);
PendingDevicesManager pendingDevicesManager = new PendingDevicesManager(pendingDevices, cacheClient, cacheCluster);
AccountsManager accountsManager = new AccountsManager(accounts, directory, cacheClient, cacheCluster, new Experiment("RedisCluster", "AccountsManager"));
UsernamesManager usernamesManager = new UsernamesManager(usernames, reservedUsernames, cacheClient, cacheCluster, new Experiment("RedisCluster", "UsernamesManager"));
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheClient, cacheCluster, new Experiment("RedisCluster", "ProfilesManager"));
AccountsManager accountsManager = new AccountsManager(accounts, directory, cacheClient, cacheCluster);
UsernamesManager usernamesManager = new UsernamesManager(usernames, reservedUsernames, cacheClient, cacheCluster);
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheClient, cacheCluster);
MessagesCache messagesCache = new MessagesCache(messagesClient, messages, accountsManager, config.getMessageCacheConfiguration().getPersistDelayMinutes());
MessagesManager messagesManager = new MessagesManager(messages, messagesCache);
RemoteConfigsManager remoteConfigsManager = new RemoteConfigsManager(remoteConfigs);
@@ -326,7 +325,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
DirectoryReconciliationClient directoryReconciliationClient = new DirectoryReconciliationClient(config.getDirectoryConfiguration().getDirectoryServerConfiguration());
ActiveUserCounter activeUserCounter = new ActiveUserCounter(config.getMetricsFactory(), cacheClient, cacheCluster, new Experiment("RedisCluster", "ActiveUserCounter"));
ActiveUserCounter activeUserCounter = new ActiveUserCounter(config.getMetricsFactory(), cacheClient, cacheCluster);
DirectoryReconciler directoryReconciler = new DirectoryReconciler(directoryReconciliationClient, directory);
AccountCleaner accountCleaner = new AccountCleaner(accountsManager, directoryQueue);
PushFeedbackProcessor pushFeedbackProcessor = new PushFeedbackProcessor(accountsManager, directoryQueue);

View File

@@ -25,7 +25,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException;
import org.whispersystems.textsecuregcm.experiment.Experiment;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.Constants;
@@ -49,7 +48,6 @@ public class RateLimiter {
private final int bucketSize;
private final double leakRatePerMillis;
private final boolean reportLimits;
private final Experiment redisClusterExperiment;
public RateLimiter(ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster, String name,
int bucketSize, double leakRatePerMinute)
@@ -71,7 +69,6 @@ public class RateLimiter {
this.bucketSize = bucketSize;
this.leakRatePerMillis = leakRatePerMinute / (60.0 * 1000.0);
this.reportLimits = reportLimits;
this.redisClusterExperiment = new Experiment("RedisCluster", "RateLimiter", name);
}
public void validate(String key, int amount) throws RateLimitExceededException {
@@ -114,11 +111,8 @@ public class RateLimiter {
}
private LeakyBucket getBucket(String key) {
try (Jedis jedis = cacheClient.getReadResource()) {
final String bucketName = getBucketName(key);
String serialized = jedis.get(bucketName);
redisClusterExperiment.compareSupplierResult(serialized, () -> cacheCluster.withReadCluster(connection -> connection.sync().get(bucketName)));
try {
final String serialized = cacheCluster.withReadCluster(connection -> connection.sync().get(getBucketName(key)));
if (serialized != null) {
return LeakyBucket.fromSerialized(mapper, serialized);

View File

@@ -20,11 +20,11 @@ import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.SetArgs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.experiment.Experiment;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.LuaScript;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import redis.clients.jedis.Jedis;
import java.io.IOException;
import java.util.Arrays;
@@ -32,8 +32,6 @@ import java.util.List;
import java.util.Optional;
import java.util.UUID;
import redis.clients.jedis.Jedis;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public class AccountDatabaseCrawlerCache {
@@ -49,8 +47,6 @@ public class AccountDatabaseCrawlerCache {
private final FaultTolerantRedisCluster cacheCluster;
private final LuaScript unlockScript;
private final ClusterLuaScript unlockClusterScript;
private final Experiment isAcceleratedExperiment = new Experiment("RedisCluster", "AccountDatabaseCrawlerCache", "isAccelerated");
private final Experiment getLastUuidExperiment = new Experiment("RedisCluster", "AccountDatabaseCrawlerCache", "getLastUuid");
public AccountDatabaseCrawlerCache(ReplicatedJedisPool jedisPool, FaultTolerantRedisCluster cacheCluster) throws IOException {
this.jedisPool = jedisPool;
@@ -67,12 +63,7 @@ public class AccountDatabaseCrawlerCache {
}
public boolean isAccelerated() {
try (Jedis jedis = jedisPool.getWriteResource()) {
final String accelerated = jedis.get(ACCELERATE_KEY);
isAcceleratedExperiment.compareSupplierResult(accelerated, () -> cacheCluster.withReadCluster(connection -> connection.sync().get(ACCELERATE_KEY)));
return "1".equals(accelerated);
}
return "1".equals(cacheCluster.withReadCluster(connection -> connection.sync().get(ACCELERATE_KEY)));
}
public boolean claimActiveWork(String workerId, long ttlMs) {
@@ -101,13 +92,10 @@ public class AccountDatabaseCrawlerCache {
}
public Optional<UUID> getLastUuid() {
try (Jedis jedis = jedisPool.getWriteResource()) {
String lastUuidString = jedis.get(LAST_UUID_KEY);
getLastUuidExperiment.compareSupplierResult(lastUuidString, () -> cacheCluster.withWriteCluster(connection -> connection.sync().get(LAST_UUID_KEY)));
final String lastUuidString = cacheCluster.withWriteCluster(connection -> connection.sync().get(LAST_UUID_KEY));
if (lastUuidString == null) return Optional.empty();
else return Optional.of(UUID.fromString(lastUuidString));
}
if (lastUuidString == null) return Optional.empty();
else return Optional.of(UUID.fromString(lastUuidString));
}
public void setLastUuid(Optional<UUID> lastUuid) {

View File

@@ -22,13 +22,13 @@ import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.lettuce.core.RedisException;
import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.AmbiguousIdentifier;
import org.whispersystems.textsecuregcm.entities.ClientContact;
import org.whispersystems.textsecuregcm.experiment.Experiment;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.Constants;
@@ -42,7 +42,6 @@ import java.util.UUID;
import static com.codahale.metrics.MetricRegistry.name;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisException;
public class AccountsManager {
@@ -63,15 +62,13 @@ public class AccountsManager {
private final FaultTolerantRedisCluster cacheCluster;
private final DirectoryManager directory;
private final ObjectMapper mapper;
private final Experiment redisClusterExperiment;
public AccountsManager(Accounts accounts, DirectoryManager directory, ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster, Experiment redisClusterExperiment) {
public AccountsManager(Accounts accounts, DirectoryManager directory, ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster) {
this.accounts = accounts;
this.directory = directory;
this.cacheClient = cacheClient;
this.cacheCluster = cacheCluster;
this.mapper = SystemMapper.getMapper();
this.redisClusterExperiment = redisClusterExperiment;
}
public boolean create(Account account) {
@@ -174,37 +171,23 @@ public class AccountsManager {
}
private Optional<Account> redisGet(String number) {
try (Jedis jedis = cacheClient.getReadResource();
Timer.Context ignored = redisNumberGetTimer.time())
{
final String key = getAccountMapKey(number);
try (Timer.Context ignored = redisNumberGetTimer.time()) {
final String uuid = cacheCluster.withReadCluster(connection -> connection.sync().get(getAccountMapKey(number)));
String uuid = jedis.get(key);
redisClusterExperiment.compareSupplierResult(uuid, () -> cacheCluster.withReadCluster(connection -> connection.sync().get(key)));
if (uuid != null) return redisGet(jedis, UUID.fromString(uuid));
if (uuid != null) return redisGet(UUID.fromString(uuid));
else return Optional.empty();
} catch (IllegalArgumentException e) {
logger.warn("Deserialization error", e);
return Optional.empty();
} catch (JedisException e) {
} catch (RedisException e) {
logger.warn("Redis failure", e);
return Optional.empty();
}
}
private Optional<Account> redisGet(UUID uuid) {
try (Jedis jedis = cacheClient.getReadResource()) {
return redisGet(jedis, uuid);
}
}
private Optional<Account> redisGet(Jedis jedis, UUID uuid) {
try (Timer.Context ignored = redisUuidGetTimer.time()) {
final String key = getAccountEntityKey(uuid);
String json = jedis.get(key);
redisClusterExperiment.compareSupplierResult(json, () -> cacheCluster.withReadCluster(connection -> connection.sync().get(key)));
final String json = cacheCluster.withReadCluster(connection -> connection.sync().get(getAccountEntityKey(uuid)));
if (json != null) {
Account account = mapper.readValue(json, Account.class);
@@ -217,7 +200,7 @@ public class AccountsManager {
} catch (IOException e) {
logger.warn("Deserialization error", e);
return Optional.empty();
} catch (JedisException e) {
} catch (RedisException e) {
logger.warn("Redis failure", e);
return Optional.empty();
}

View File

@@ -21,7 +21,6 @@ import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.whispersystems.textsecuregcm.entities.ActiveUserTally;
import org.whispersystems.textsecuregcm.experiment.Experiment;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.SystemMapper;
@@ -52,14 +51,12 @@ public class ActiveUserCounter extends AccountDatabaseCrawlerListener {
private final ReplicatedJedisPool jedisPool;
private final FaultTolerantRedisCluster cacheCluster;
private final ObjectMapper mapper;
private final Experiment redisClusterExperiment;
public ActiveUserCounter(MetricsFactory metricsFactory, ReplicatedJedisPool jedisPool, FaultTolerantRedisCluster cacheCluster, Experiment redisClusterExperiment) {
public ActiveUserCounter(MetricsFactory metricsFactory, ReplicatedJedisPool jedisPool, FaultTolerantRedisCluster cacheCluster) {
this.metricsFactory = metricsFactory;
this.jedisPool = jedisPool;
this.cacheCluster = cacheCluster;
this.mapper = SystemMapper.getMapper();
this.redisClusterExperiment = redisClusterExperiment;
}
@Override
@@ -164,8 +161,7 @@ public class ActiveUserCounter extends AccountDatabaseCrawlerListener {
private void incrementTallies(UUID fromUuid, Map<String, long[]> platformIncrements, Map<String, long[]> countryIncrements) {
try (Jedis jedis = jedisPool.getWriteResource()) {
String tallyValue = jedis.get(TALLY_KEY);
redisClusterExperiment.compareSupplierResult(tallyValue, () -> cacheCluster.withReadCluster(connection -> connection.sync().get(TALLY_KEY)));
final String tallyValue = cacheCluster.withReadCluster(connection -> connection.sync().get(TALLY_KEY));
ActiveUserTally activeUserTally;
@@ -208,9 +204,8 @@ public class ActiveUserCounter extends AccountDatabaseCrawlerListener {
}
private ActiveUserTally getFinalTallies() {
try (Jedis jedis = jedisPool.getReadResource()) {
final String tallyJson = jedis.get(TALLY_KEY);
redisClusterExperiment.compareSupplierResult(tallyJson, () -> cacheCluster.withReadCluster(connection -> connection.sync().get(TALLY_KEY)));
try {
final String tallyJson = cacheCluster.withReadCluster(connection -> connection.sync().get(TALLY_KEY));
return mapper.readValue(tallyJson, ActiveUserTally.class);
} catch (IOException e) {

View File

@@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.StoredVerificationCode;
import org.whispersystems.textsecuregcm.experiment.Experiment;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.SystemMapper;
@@ -41,7 +40,6 @@ public class PendingAccountsManager {
private final ReplicatedJedisPool cacheClient;
private final FaultTolerantRedisCluster cacheCluster;
private final ObjectMapper mapper;
private final Experiment redisClusterExperiment = new Experiment("RedisCluster", "PendingAccountsManager");
public PendingAccountsManager(PendingAccounts pendingAccounts, ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster)
{
@@ -85,11 +83,8 @@ public class PendingAccountsManager {
}
private Optional<StoredVerificationCode> memcacheGet(String number) {
try (Jedis jedis = cacheClient.getReadResource()) {
final String key = CACHE_PREFIX + number;
String json = jedis.get(key);
redisClusterExperiment.compareSupplierResult(json, () -> cacheCluster.withReadCluster(connection -> connection.sync().get(key)));
try {
final String json = cacheCluster.withReadCluster(connection -> connection.sync().get(CACHE_PREFIX + number));
if (json == null) return Optional.empty();
else return Optional.of(mapper.readValue(json, StoredVerificationCode.class));

View File

@@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.StoredVerificationCode;
import org.whispersystems.textsecuregcm.experiment.Experiment;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.SystemMapper;
@@ -41,7 +40,6 @@ public class PendingDevicesManager {
private final ReplicatedJedisPool cacheClient;
private final FaultTolerantRedisCluster cacheCluster;
private final ObjectMapper mapper;
private final Experiment redisClusterExperiment = new Experiment("RedisCluster", "PendingDevicesManager");
public PendingDevicesManager(PendingDevices pendingDevices, ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster) {
this.pendingDevices = pendingDevices;
@@ -84,11 +82,8 @@ public class PendingDevicesManager {
}
private Optional<StoredVerificationCode> memcacheGet(String number) {
try (Jedis jedis = cacheClient.getReadResource()) {
final String key = CACHE_PREFIX + number;
String json = jedis.get(key);
redisClusterExperiment.compareSupplierResult(json, () -> cacheCluster.withReadCluster(connection -> connection.sync().get(key)));
try {
final String json = cacheCluster.withReadCluster(connection -> connection.sync().get(CACHE_PREFIX + number));
if (json == null) return Optional.empty();
else return Optional.of(mapper.readValue(json, StoredVerificationCode.class));

View File

@@ -2,9 +2,9 @@ package org.whispersystems.textsecuregcm.storage;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.lettuce.core.RedisException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.experiment.Experiment;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.SystemMapper;
@@ -14,7 +14,6 @@ import java.util.Optional;
import java.util.UUID;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisException;
public class ProfilesManager {
@@ -26,14 +25,12 @@ public class ProfilesManager {
private final ReplicatedJedisPool cacheClient;
private final FaultTolerantRedisCluster cacheCluster;
private final ObjectMapper mapper;
private final Experiment redisClusterExperiment;
public ProfilesManager(Profiles profiles, ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster, Experiment redisClusterExperiment) {
this.profiles = profiles;
public ProfilesManager(Profiles profiles, ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster) {
this.cacheClient = cacheClient;
this.profiles = profiles;
this.cacheCluster = cacheCluster;
this.mapper = SystemMapper.getMapper();
this.redisClusterExperiment = redisClusterExperiment;
}
public void set(UUID uuid, VersionedProfile versionedProfile) {
@@ -70,18 +67,15 @@ public class ProfilesManager {
}
private Optional<VersionedProfile> memcacheGet(UUID uuid, String version) {
try (Jedis jedis = cacheClient.getReadResource()) {
final String key = CACHE_PREFIX + uuid.toString();
String json = jedis.hget(key, version);
redisClusterExperiment.compareSupplierResult(json, () -> cacheCluster.withReadCluster(connection -> connection.sync().hget(key, version)));
try {
final String json = cacheCluster.withReadCluster(connection -> connection.sync().hget(CACHE_PREFIX + uuid.toString(), version));
if (json == null) return Optional.empty();
else return Optional.of(mapper.readValue(json, VersionedProfile.class));
} catch (IOException e) {
logger.warn("Error deserializing value...", e);
return Optional.empty();
} catch (JedisException e) {
} catch (RedisException e) {
logger.warn("Redis exception", e);
return Optional.empty();
}

View File

@@ -3,10 +3,10 @@ package org.whispersystems.textsecuregcm.storage;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import io.lettuce.core.RedisException;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.experiment.Experiment;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.Constants;
@@ -36,14 +36,12 @@ public class UsernamesManager {
private final ReservedUsernames reservedUsernames;
private final ReplicatedJedisPool cacheClient;
private final FaultTolerantRedisCluster cacheCluster;
private final Experiment redisClusterExperiment;
public UsernamesManager(Usernames usernames, ReservedUsernames reservedUsernames, ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster, Experiment redisClusterExperiment) {
public UsernamesManager(Usernames usernames, ReservedUsernames reservedUsernames, ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster) {
this.usernames = usernames;
this.reservedUsernames = reservedUsernames;
this.cacheClient = cacheClient;
this.cacheCluster = cacheCluster;
this.redisClusterExperiment = redisClusterExperiment;
}
public boolean put(UUID uuid, String username) {
@@ -116,59 +114,50 @@ public class UsernamesManager {
}
private void redisSet(UUID uuid, String username, boolean required) {
try (Jedis jedis = cacheClient.getWriteResource();
Timer.Context ignored = redisSetTimer.time())
{
final String uuidMapKey = getUuidMapKey(uuid);
final String usernameMapKey = getUsernameMapKey(username);
final Optional<String> maybeOldUsername = Optional.ofNullable(jedis.get(uuidMapKey));
maybeOldUsername.ifPresent(oldUsername -> jedis.del(getUsernameMapKey(oldUsername)));
jedis.set(uuidMapKey, username);
jedis.set(usernameMapKey, uuid.toString());
final String uuidMapKey = getUuidMapKey(uuid);
final String usernameMapKey = getUsernameMapKey(username);
try (Timer.Context ignored = redisSetTimer.time()) {
cacheCluster.useWriteCluster(connection -> {
final RedisAdvancedClusterCommands<String, String> commands = connection.sync();
final Optional<String> maybeOldUsername = Optional.ofNullable(commands.get(uuidMapKey));
maybeOldUsername.ifPresent(oldUsername -> commands.del(getUsernameMapKey(oldUsername)));
commands.set(uuidMapKey, username);
commands.set(usernameMapKey, uuid.toString());
try (final Jedis jedis = cacheClient.getWriteResource()) {
maybeOldUsername.ifPresent(oldUsername -> jedis.del(getUsernameMapKey(oldUsername)));
jedis.set(uuidMapKey, username);
jedis.set(usernameMapKey, uuid.toString());
}
});
} catch (JedisException e) {
} catch (JedisException | RedisException e) {
if (required) throw e;
else logger.warn("Ignoring jedis failure", e);
else logger.warn("Ignoring Redis failure", e);
}
}
private Optional<UUID> redisGet(String username) {
try (Jedis jedis = cacheClient.getReadResource();
Timer.Context ignored = redisUsernameGetTimer.time())
{
final String key = getUsernameMapKey(username);
String result = jedis.get(key);
redisClusterExperiment.compareSupplierResult(result, () -> cacheCluster.withReadCluster(connection -> connection.sync().get(key)));
try (Timer.Context ignored = redisUsernameGetTimer.time()) {
final String result = cacheCluster.withReadCluster(connection -> connection.sync().get(getUsernameMapKey(username)));
if (result == null) return Optional.empty();
else return Optional.of(UUID.fromString(result));
} catch (JedisException e) {
} catch (RedisException e) {
logger.warn("Redis get failure", e);
return Optional.empty();
}
}
private Optional<String> redisGet(UUID uuid) {
try (Jedis jedis = cacheClient.getReadResource();
Timer.Context ignored = redisUuidGetTimer.time())
{
final String key = getUuidMapKey(uuid);
final String result = jedis.get(key);
redisClusterExperiment.compareSupplierResult(result, () -> cacheCluster.withReadCluster(connection -> connection.sync().get(key)));
try (Timer.Context ignored = redisUuidGetTimer.time()) {
final String result = cacheCluster.withReadCluster(connection -> connection.sync().get(getUuidMapKey(uuid)));
return Optional.ofNullable(result);
} catch (JedisException e) {
} catch (RedisException e) {
logger.warn("Redis get failure", e);
return Optional.empty();
}

View File

@@ -1,8 +1,10 @@
package org.whispersystems.textsecuregcm.workers;
import com.fasterxml.jackson.databind.DeserializationFeature;
import io.lettuce.core.RedisURI;
import io.lettuce.core.cluster.RedisClusterClient;
import io.dropwizard.Application;
import io.dropwizard.cli.EnvironmentCommand;
import io.dropwizard.jdbi3.JdbiFactory;
import io.dropwizard.setup.Environment;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import org.jdbi.v3.core.Jdbi;
@@ -10,7 +12,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.auth.AuthenticationCredentials;
import org.whispersystems.textsecuregcm.experiment.Experiment;
import org.whispersystems.textsecuregcm.providers.RedisClientFactory;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
@@ -25,12 +26,6 @@ import org.whispersystems.textsecuregcm.util.Base64;
import java.security.SecureRandom;
import java.util.Optional;
import java.util.stream.Collectors;
import io.dropwizard.Application;
import io.dropwizard.cli.EnvironmentCommand;
import io.dropwizard.jdbi3.JdbiFactory;
import io.dropwizard.setup.Environment;
public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfiguration> {
@@ -78,7 +73,7 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
ReplicatedJedisPool redisClient = new RedisClientFactory("directory_cache_delete_command", configuration.getDirectoryConfiguration().getRedisConfiguration().getUrl(), configuration.getDirectoryConfiguration().getRedisConfiguration().getReplicaUrls(), configuration.getDirectoryConfiguration().getRedisConfiguration().getCircuitBreakerConfiguration()).getRedisClientPool();
DirectoryQueue directoryQueue = new DirectoryQueue (configuration.getDirectoryConfiguration().getSqsConfiguration());
DirectoryManager directory = new DirectoryManager(redisClient );
AccountsManager accountsManager = new AccountsManager(accounts, directory, cacheClient, cacheCluster, new Experiment("RedisCluster", "AccountsManager"));
AccountsManager accountsManager = new AccountsManager(accounts, directory, cacheClient, cacheCluster);
for (String user: users) {
Optional<Account> account = accountsManager.get(user);