Introduce a hyper-log-log-based cardinality rate limiter

This commit is contained in:
Jon Chambers
2021-02-11 10:36:26 -05:00
committed by GitHub
parent dcbf285fae
commit e0ed8fa0b8
11 changed files with 235 additions and 39 deletions

View File

@@ -0,0 +1,87 @@
/*
* Copyright 2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.limits;
import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration.CardinalityRateLimitConfiguration;
import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import java.time.Duration;
import java.util.Random;
/**
* A cardinality rate limiter prevents an actor from taking some action if that actor has attempted to take that action
* on too many targets in a fixed period of time. Behind the scenes, we estimate the target count using a
* hyper-log-log data structure; as a consequence, the number of targets is an approximation, and this rate limiter
* should not be used in cases where precise time or target limits are required.
*/
public class CardinalityRateLimiter {
private final FaultTolerantRedisCluster cacheCluster;
private final String name;
private final Duration ttl;
private final Duration ttlJitter;
private final int maxCardinality;
private final Random random = new Random();
public CardinalityRateLimiter(final FaultTolerantRedisCluster cacheCluster, final String name, final Duration ttl, final Duration ttlJitter, final int maxCardinality) {
this.cacheCluster = cacheCluster;
this.name = name;
this.ttl = ttl;
this.ttlJitter = ttlJitter;
this.maxCardinality = maxCardinality;
}
public void validate(final String key, final String target) throws RateLimitExceededException {
final String hllKey = getHllKey(key);
final boolean rateLimitExceeded = cacheCluster.withCluster(connection -> {
final boolean changed = connection.sync().pfadd(hllKey, target) == 1;
final long cardinality = connection.sync().pfcount(hllKey);
final boolean mayNeedExpiration = changed && cardinality == 1;
// If the set already existed, we can assume it already had an expiration time and can save a round trip by
// skipping the ttl check.
if (mayNeedExpiration && connection.sync().ttl(hllKey) == -1) {
final long expireSeconds = ttl.plusSeconds(random.nextInt((int) ttlJitter.toSeconds())).toSeconds();
connection.sync().expire(hllKey, expireSeconds);
}
return changed && cardinality > maxCardinality;
});
if (rateLimitExceeded) {
throw new RateLimitExceededException();
}
}
private String getHllKey(final String key) {
return "hll_rate_limit::" + name + "::" + key;
}
public Duration getTtl() {
return ttl;
}
public Duration getTtlJitter() {
return ttlJitter;
}
public int getMaxCardinality() {
return maxCardinality;
}
public boolean hasConfiguration(final CardinalityRateLimitConfiguration configuration) {
return maxCardinality == configuration.getMaxCardinality() &&
ttl.equals(configuration.getTtl()) &&
ttlJitter.equals(configuration.getTtlJitter());
}
}

View File

@@ -12,6 +12,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration.RateLimitConfiguration;
import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.util.Constants;
@@ -113,4 +114,8 @@ public class RateLimiter {
private String getBucketName(String key) {
return "leaky_bucket::" + name + "::" + key;
}
public boolean hasConfiguration(final RateLimitConfiguration configuration) {
return bucketSize == configuration.getBucketSize() && leakRatePerMinute == configuration.getLeakRatePerMinute();
}
}

View File

@@ -5,14 +5,13 @@
package org.whispersystems.textsecuregcm.limits;
import java.util.concurrent.atomic.AtomicReference;
import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration;
import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration.CardinalityRateLimitConfiguration;
import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration.RateLimitConfiguration;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.UnaryOperator;
public class RateLimiters {
private final RateLimiter smsDestinationLimiter;
@@ -38,7 +37,7 @@ public class RateLimiters {
private final RateLimiter usernameLookupLimiter;
private final RateLimiter usernameSetLimiter;
private final AtomicReference<RateLimiter> unsealedSenderLimiter;
private final AtomicReference<CardinalityRateLimiter> unsealedSenderLimiter;
private final AtomicReference<RateLimiter> unsealedIpLimiter;
private final FaultTolerantRedisCluster cacheCluster;
@@ -124,11 +123,11 @@ public class RateLimiters {
this.unsealedIpLimiter = new AtomicReference<>(createUnsealedIpLimiter(cacheCluster, dynamicConfig.getConfiguration().getLimits().getUnsealedSenderIp()));
}
public RateLimiter getUnsealedSenderLimiter() {
RateLimitConfiguration currentConfiguration = dynamicConfig.getConfiguration().getLimits().getUnsealedSenderNumber();
public CardinalityRateLimiter getUnsealedSenderLimiter() {
CardinalityRateLimitConfiguration currentConfiguration = dynamicConfig.getConfiguration().getLimits().getUnsealedSenderNumber();
return this.unsealedSenderLimiter.updateAndGet(rateLimiter -> {
if (isLimiterConfigurationCurrent(rateLimiter, currentConfiguration)) {
if (rateLimiter.hasConfiguration(currentConfiguration)) {
return rateLimiter;
} else {
return createUnsealedSenderLimiter(cacheCluster, currentConfiguration);
@@ -140,7 +139,7 @@ public class RateLimiters {
RateLimitConfiguration currentConfiguration = dynamicConfig.getConfiguration().getLimits().getUnsealedSenderIp();
return this.unsealedIpLimiter.updateAndGet(rateLimiter -> {
if (isLimiterConfigurationCurrent(rateLimiter, currentConfiguration)) {
if (rateLimiter.hasConfiguration(currentConfiguration)) {
return rateLimiter;
} else {
return createUnsealedIpLimiter(cacheCluster, currentConfiguration);
@@ -220,10 +219,8 @@ public class RateLimiters {
return usernameSetLimiter;
}
private RateLimiter createUnsealedSenderLimiter(FaultTolerantRedisCluster cacheCluster,
RateLimitConfiguration configuration)
{
return createLimiter(cacheCluster, configuration, "unsealedSender");
private CardinalityRateLimiter createUnsealedSenderLimiter(FaultTolerantRedisCluster cacheCluster, CardinalityRateLimitConfiguration configuration) {
return new CardinalityRateLimiter(cacheCluster, "unsealedSender", configuration.getTtl(), configuration.getTtlJitter(), configuration.getMaxCardinality());
}
private RateLimiter createUnsealedIpLimiter(FaultTolerantRedisCluster cacheCluster,
@@ -237,8 +234,4 @@ public class RateLimiters {
configuration.getBucketSize(),
configuration.getLeakRatePerMinute());
}
private boolean isLimiterConfigurationCurrent(RateLimiter limiter, RateLimitConfiguration configuration) {
return limiter.getBucketSize() == configuration.getBucketSize() && limiter.getLeakRatePerMinute() == configuration.getLeakRatePerMinute();
}
}