Mirror writes to the cache cluster.

This commit is contained in:
Jon Chambers
2020-06-06 12:23:05 -04:00
committed by Jon Chambers
parent fe1054d58a
commit 1388103919
16 changed files with 337 additions and 183 deletions

View File

@@ -3,7 +3,9 @@ package org.whispersystems.textsecuregcm.limits;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import io.lettuce.core.SetArgs;
import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.Constants;
@@ -14,8 +16,8 @@ public class LockingRateLimiter extends RateLimiter {
private final Meter meter;
public LockingRateLimiter(ReplicatedJedisPool cacheClient, String name, int bucketSize, double leakRatePerMinute) {
super(cacheClient, name, bucketSize, leakRatePerMinute);
public LockingRateLimiter(ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster, String name, int bucketSize, double leakRatePerMinute) {
super(cacheClient, cacheCluster, name, bucketSize, leakRatePerMinute);
MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
this.meter = metricRegistry.meter(name(getClass(), name, "locked"));
@@ -42,13 +44,21 @@ public class LockingRateLimiter extends RateLimiter {
private void releaseLock(String key) {
try (Jedis jedis = cacheClient.getWriteResource()) {
jedis.del(getLockName(key));
final String lockName = getLockName(key);
jedis.del(lockName);
cacheCluster.useWriteCluster(connection -> connection.async().del(lockName));
}
}
private boolean acquireLock(String key) {
try (Jedis jedis = cacheClient.getWriteResource()) {
return jedis.set(getLockName(key), "L", "NX", "EX", 10) != null;
final String lockName = getLockName(key);
final boolean acquiredLock = jedis.set(lockName, "L", "NX", "EX", 10) != null;
cacheCluster.useWriteCluster(connection -> connection.async().set(lockName, "L", SetArgs.Builder.nx().ex(10)));
return acquiredLock;
}
}

View File

@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.SystemMapper;
@@ -38,20 +39,21 @@ public class RateLimiter {
private final Logger logger = LoggerFactory.getLogger(RateLimiter.class);
private final ObjectMapper mapper = SystemMapper.getMapper();
private final Meter meter;
protected final ReplicatedJedisPool cacheClient;
protected final String name;
private final int bucketSize;
private final double leakRatePerMillis;
private final boolean reportLimits;
private final Meter meter;
protected final ReplicatedJedisPool cacheClient;
protected final FaultTolerantRedisCluster cacheCluster;
protected final String name;
private final int bucketSize;
private final double leakRatePerMillis;
private final boolean reportLimits;
public RateLimiter(ReplicatedJedisPool cacheClient, String name,
public RateLimiter(ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster, String name,
int bucketSize, double leakRatePerMinute)
{
this(cacheClient, name, bucketSize, leakRatePerMinute, false);
this(cacheClient, cacheCluster, name, bucketSize, leakRatePerMinute, false);
}
public RateLimiter(ReplicatedJedisPool cacheClient, String name,
public RateLimiter(ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster, String name,
int bucketSize, double leakRatePerMinute,
boolean reportLimits)
{
@@ -59,6 +61,7 @@ public class RateLimiter {
this.meter = metricRegistry.meter(name(getClass(), name, "exceeded"));
this.cacheClient = cacheClient;
this.cacheCluster = cacheCluster;
this.name = name;
this.bucketSize = bucketSize;
this.leakRatePerMillis = leakRatePerMinute / (60.0 * 1000.0);
@@ -82,14 +85,21 @@ public class RateLimiter {
public void clear(String key) {
try (Jedis jedis = cacheClient.getWriteResource()) {
jedis.del(getBucketName(key));
final String bucketName = getBucketName(key);
jedis.del(bucketName);
cacheCluster.useWriteCluster(connection -> connection.async().del(bucketName));
}
}
private void setBucket(String key, LeakyBucket bucket) {
try (Jedis jedis = cacheClient.getWriteResource()) {
String serialized = bucket.serialize(mapper);
jedis.setex(getBucketName(key), (int) Math.ceil((bucketSize / leakRatePerMillis) / 1000), serialized);
final String bucketName = getBucketName(key);
final String serialized = bucket.serialize(mapper);
final int level = (int) Math.ceil((bucketSize / leakRatePerMillis) / 1000);
jedis.setex(bucketName, level, serialized);
cacheCluster.useWriteCluster(connection -> connection.async().setex(bucketName, level, serialized));
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(e);
}

View File

@@ -18,6 +18,7 @@ package org.whispersystems.textsecuregcm.limits;
import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
public class RateLimiters {
@@ -47,84 +48,84 @@ public class RateLimiters {
private final RateLimiter usernameLookupLimiter;
private final RateLimiter usernameSetLimiter;
public RateLimiters(RateLimitsConfiguration config, ReplicatedJedisPool cacheClient) {
this.smsDestinationLimiter = new RateLimiter(cacheClient, "smsDestination",
public RateLimiters(RateLimitsConfiguration config, ReplicatedJedisPool cacheClient, FaultTolerantRedisCluster cacheCluster) {
this.smsDestinationLimiter = new RateLimiter(cacheClient, cacheCluster, "smsDestination",
config.getSmsDestination().getBucketSize(),
config.getSmsDestination().getLeakRatePerMinute());
this.voiceDestinationLimiter = new RateLimiter(cacheClient, "voxDestination",
this.voiceDestinationLimiter = new RateLimiter(cacheClient, cacheCluster, "voxDestination",
config.getVoiceDestination().getBucketSize(),
config.getVoiceDestination().getLeakRatePerMinute());
this.voiceDestinationDailyLimiter = new RateLimiter(cacheClient, "voxDestinationDaily",
this.voiceDestinationDailyLimiter = new RateLimiter(cacheClient, cacheCluster, "voxDestinationDaily",
config.getVoiceDestinationDaily().getBucketSize(),
config.getVoiceDestinationDaily().getLeakRatePerMinute());
this.smsVoiceIpLimiter = new RateLimiter(cacheClient, "smsVoiceIp",
this.smsVoiceIpLimiter = new RateLimiter(cacheClient, cacheCluster, "smsVoiceIp",
config.getSmsVoiceIp().getBucketSize(),
config.getSmsVoiceIp().getLeakRatePerMinute());
this.smsVoicePrefixLimiter = new RateLimiter(cacheClient, "smsVoicePrefix",
this.smsVoicePrefixLimiter = new RateLimiter(cacheClient, cacheCluster, "smsVoicePrefix",
config.getSmsVoicePrefix().getBucketSize(),
config.getSmsVoicePrefix().getLeakRatePerMinute());
this.autoBlockLimiter = new RateLimiter(cacheClient, "autoBlock",
this.autoBlockLimiter = new RateLimiter(cacheClient, cacheCluster, "autoBlock",
config.getAutoBlock().getBucketSize(),
config.getAutoBlock().getLeakRatePerMinute());
this.verifyLimiter = new LockingRateLimiter(cacheClient, "verify",
this.verifyLimiter = new LockingRateLimiter(cacheClient, cacheCluster, "verify",
config.getVerifyNumber().getBucketSize(),
config.getVerifyNumber().getLeakRatePerMinute());
this.pinLimiter = new LockingRateLimiter(cacheClient, "pin",
this.pinLimiter = new LockingRateLimiter(cacheClient, cacheCluster, "pin",
config.getVerifyPin().getBucketSize(),
config.getVerifyPin().getLeakRatePerMinute());
this.attachmentLimiter = new RateLimiter(cacheClient, "attachmentCreate",
this.attachmentLimiter = new RateLimiter(cacheClient, cacheCluster, "attachmentCreate",
config.getAttachments().getBucketSize(),
config.getAttachments().getLeakRatePerMinute());
this.contactsLimiter = new RateLimiter(cacheClient, "contactsQuery",
this.contactsLimiter = new RateLimiter(cacheClient, cacheCluster, "contactsQuery",
config.getContactQueries().getBucketSize(),
config.getContactQueries().getLeakRatePerMinute());
this.contactsIpLimiter = new RateLimiter(cacheClient, "contactsIpQuery",
this.contactsIpLimiter = new RateLimiter(cacheClient, cacheCluster, "contactsIpQuery",
config.getContactIpQueries().getBucketSize(),
config.getContactIpQueries().getLeakRatePerMinute());
this.preKeysLimiter = new RateLimiter(cacheClient, "prekeys",
this.preKeysLimiter = new RateLimiter(cacheClient, cacheCluster, "prekeys",
config.getPreKeys().getBucketSize(),
config.getPreKeys().getLeakRatePerMinute());
this.messagesLimiter = new RateLimiter(cacheClient, "messages",
this.messagesLimiter = new RateLimiter(cacheClient, cacheCluster, "messages",
config.getMessages().getBucketSize(),
config.getMessages().getLeakRatePerMinute());
this.allocateDeviceLimiter = new RateLimiter(cacheClient, "allocateDevice",
this.allocateDeviceLimiter = new RateLimiter(cacheClient, cacheCluster, "allocateDevice",
config.getAllocateDevice().getBucketSize(),
config.getAllocateDevice().getLeakRatePerMinute());
this.verifyDeviceLimiter = new RateLimiter(cacheClient, "verifyDevice",
this.verifyDeviceLimiter = new RateLimiter(cacheClient, cacheCluster, "verifyDevice",
config.getVerifyDevice().getBucketSize(),
config.getVerifyDevice().getLeakRatePerMinute());
this.turnLimiter = new RateLimiter(cacheClient, "turnAllocate",
this.turnLimiter = new RateLimiter(cacheClient, cacheCluster, "turnAllocate",
config.getTurnAllocations().getBucketSize(),
config.getTurnAllocations().getLeakRatePerMinute());
this.profileLimiter = new RateLimiter(cacheClient, "profile",
this.profileLimiter = new RateLimiter(cacheClient, cacheCluster, "profile",
config.getProfile().getBucketSize(),
config.getProfile().getLeakRatePerMinute());
this.stickerPackLimiter = new RateLimiter(cacheClient, "stickerPack",
this.stickerPackLimiter = new RateLimiter(cacheClient, cacheCluster, "stickerPack",
config.getStickerPack().getBucketSize(),
config.getStickerPack().getLeakRatePerMinute());
this.usernameLookupLimiter = new RateLimiter(cacheClient, "usernameLookup",
this.usernameLookupLimiter = new RateLimiter(cacheClient, cacheCluster, "usernameLookup",
config.getUsernameLookup().getBucketSize(),
config.getUsernameLookup().getLeakRatePerMinute());
this.usernameSetLimiter = new RateLimiter(cacheClient, "usernameSet",
this.usernameSetLimiter = new RateLimiter(cacheClient, cacheCluster, "usernameSet",
config.getUsernameSet().getBucketSize(),
config.getUsernameSet().getLeakRatePerMinute());
}