Improve Redis exception handling

This commit is contained in:
Jon Chambers
2021-09-22 10:31:39 -04:00
committed by GitHub
parent 6a71d369e2
commit 98e41f9a37
12 changed files with 49 additions and 407 deletions

View File

@@ -18,7 +18,6 @@ import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.push.ApnMessage.Type;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.RedisException;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
@@ -135,27 +134,19 @@ public class ApnFallbackManager implements Managed {
}
}
public void schedule(Account account, Device device) throws RedisException {
public void schedule(Account account, Device device) {
schedule(account, device, System.currentTimeMillis());
}
@VisibleForTesting
void schedule(Account account, Device device, long timestamp) throws RedisException {
try {
sent.mark();
insert(account, device, timestamp + (15 * 1000), (15 * 1000));
} catch (io.lettuce.core.RedisException e) {
throw new RedisException(e);
}
void schedule(Account account, Device device, long timestamp) {
sent.mark();
insert(account, device, timestamp + (15 * 1000), (15 * 1000));
}
public void cancel(Account account, Device device) throws RedisException {
try {
if (remove(account, device)) {
delivered.mark();
}
} catch (io.lettuce.core.RedisException e) {
throw new RedisException(e);
public void cancel(Account account, Device device) {
if (remove(account, device)) {
delivered.mark();
}
}

View File

@@ -7,40 +7,28 @@ package org.whispersystems.textsecuregcm.redis;
import static com.codahale.metrics.MetricRegistry.name;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.retry.Retry;
import io.lettuce.core.RedisCommandTimeoutException;
import io.lettuce.core.RedisException;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.ThreadDumpUtil;
public class FaultTolerantPubSubConnection<K, V> {
private final String name;
private final StatefulRedisClusterPubSubConnection<K, V> pubSubConnection;
private final CircuitBreaker circuitBreaker;
private final Retry retry;
private final Timer executeTimer;
private final Meter commandTimeoutMeter;
private final AtomicBoolean wroteThreadDump = new AtomicBoolean(false);
private static final Logger log = LoggerFactory.getLogger(FaultTolerantPubSubConnection.class);
public FaultTolerantPubSubConnection(final String name, final StatefulRedisClusterPubSubConnection<K, V> pubSubConnection, final CircuitBreaker circuitBreaker, final Retry retry) {
this.name = name;
this.pubSubConnection = pubSubConnection;
this.circuitBreaker = circuitBreaker;
this.retry = retry;
@@ -48,9 +36,7 @@ public class FaultTolerantPubSubConnection<K, V> {
this.pubSubConnection.setNodeMessagePropagation(true);
final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
this.executeTimer = metricRegistry.timer(name(getClass(), name + "-pubsub", "execute"));
this.commandTimeoutMeter = metricRegistry.meter(name(getClass(), name + "-pubsub", "commandTimeout"));
CircuitBreakerUtil.registerMetrics(metricRegistry, circuitBreaker, FaultTolerantPubSubConnection.class);
}
@@ -60,18 +46,13 @@ public class FaultTolerantPubSubConnection<K, V> {
circuitBreaker.executeCheckedRunnable(() -> retry.executeRunnable(() -> {
try (final Timer.Context ignored = executeTimer.time()) {
consumer.accept(pubSubConnection);
} catch (final RedisCommandTimeoutException e) {
recordCommandTimeout(e);
throw e;
}
}));
} catch (final Throwable t) {
log.warn("Redis operation failure", t);
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
if (t instanceof RedisException) {
throw (RedisException) t;
} else {
throw new RuntimeException(t);
throw new RedisException(t);
}
}
}
@@ -81,28 +62,14 @@ public class FaultTolerantPubSubConnection<K, V> {
return circuitBreaker.executeCheckedSupplier(() -> retry.executeCallable(() -> {
try (final Timer.Context ignored = executeTimer.time()) {
return function.apply(pubSubConnection);
} catch (final RedisCommandTimeoutException e) {
recordCommandTimeout(e);
throw e;
}
}));
} catch (final Throwable t) {
log.warn("Redis operation failure", t);
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
if (t instanceof RedisException) {
throw (RedisException) t;
} else {
throw new RuntimeException(t);
throw new RedisException(t);
}
}
}
private void recordCommandTimeout(final RedisCommandTimeoutException e) {
commandTimeoutMeter.mark();
log.warn("[{}] Command timeout exception ({}-pubsub)", Thread.currentThread().getName(), this.name, e);
if (wroteThreadDump.compareAndSet(false, true)) {
ThreadDumpUtil.writeThreadDump();
}
}
}

View File

@@ -1,118 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.redis;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.codec.ByteArrayCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.configuration.RedisConfiguration;
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
import org.whispersystems.textsecuregcm.util.Constants;
import java.time.Duration;
import java.util.function.Consumer;
import java.util.function.Function;
import static com.codahale.metrics.MetricRegistry.name;
public class FaultTolerantRedisClient {
private final RedisClient client;
private final StatefulRedisConnection<String, String> stringConnection;
private final StatefulRedisConnection<byte[], byte[]> binaryConnection;
private final CircuitBreaker circuitBreaker;
private final Timer executeTimer;
private static final Logger log = LoggerFactory.getLogger(FaultTolerantRedisClient.class);
public FaultTolerantRedisClient(final String name, final RedisConfiguration redisConfiguration) {
this(name, RedisClient.create(redisConfiguration.getUrl()), redisConfiguration.getTimeout(), redisConfiguration.getCircuitBreakerConfiguration());
}
@VisibleForTesting
FaultTolerantRedisClient(final String name, final RedisClient redisClient, final Duration commandTimeout, final CircuitBreakerConfiguration circuitBreakerConfiguration) {
this.client = redisClient;
this.client.setDefaultTimeout(commandTimeout);
this.stringConnection = client.connect();
this.binaryConnection = client.connect(ByteArrayCodec.INSTANCE);
this.circuitBreaker = CircuitBreaker.of(name + "-breaker", circuitBreakerConfiguration.toCircuitBreakerConfig());
CircuitBreakerUtil.registerMetrics(SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME),
circuitBreaker,
FaultTolerantRedisCluster.class);
this.executeTimer = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME).timer(name(getClass(), name, "execute"));
}
@VisibleForTesting
void shutdown() {
stringConnection.close();
client.shutdown();
}
public void useClient(final Consumer<StatefulRedisConnection<String, String>> consumer) {
useConnection(stringConnection, consumer);
}
public <T> T withClient(final Function<StatefulRedisConnection<String, String>, T> function) {
return withConnection(stringConnection, function);
}
public void useBinaryClient(final Consumer<StatefulRedisConnection<byte[], byte[]>> consumer) {
useConnection(binaryConnection, consumer);
}
public <T> T withBinaryClient(final Function<StatefulRedisConnection<byte[], byte[]>, T> function) {
return withConnection(binaryConnection, function);
}
private <K, V> void useConnection(final StatefulRedisConnection<K, V> connection, final Consumer<StatefulRedisConnection<K, V>> consumer) {
try {
circuitBreaker.executeCheckedRunnable(() -> {
try (final Timer.Context ignored = executeTimer.time()) {
consumer.accept(connection);
}
});
} catch (final Throwable t) {
log.warn("Redis operation failure", t);
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else {
throw new RuntimeException(t);
}
}
}
private <T, K, V> T withConnection(final StatefulRedisConnection<K, V> connection, final Function<StatefulRedisConnection<K, V>, T> function) {
try {
return circuitBreaker.executeCheckedSupplier(() -> {
try (final Timer.Context ignored = executeTimer.time()) {
return function.apply(connection);
}
});
} catch (final Throwable t) {
log.warn("Redis operation failure", t);
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else {
throw new RuntimeException(t);
}
}
}
}

View File

@@ -5,15 +5,12 @@
package org.whispersystems.textsecuregcm.redis;
import static com.codahale.metrics.MetricRegistry.name;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.google.common.annotations.VisibleForTesting;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.retry.Retry;
import io.lettuce.core.RedisCommandTimeoutException;
import io.lettuce.core.RedisException;
import io.lettuce.core.RedisURI;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
@@ -25,18 +22,14 @@ import io.lettuce.core.resource.ClientResources;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.configuration.RedisClusterConfiguration;
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.ThreadDumpUtil;
/**
* A fault-tolerant access manager for a Redis cluster. A fault-tolerant Redis cluster provides managed,
@@ -56,11 +49,6 @@ public class FaultTolerantRedisCluster {
private final CircuitBreaker circuitBreaker;
private final Retry retry;
private final Meter commandTimeoutMeter;
private final AtomicBoolean wroteThreadDump = new AtomicBoolean(false);
private static final Logger log = LoggerFactory.getLogger(FaultTolerantRedisCluster.class);
public FaultTolerantRedisCluster(final String name, final RedisClusterConfiguration clusterConfiguration, final ClientResources clientResources) {
this(name,
RedisClusterClient.create(clientResources, clusterConfiguration.getUrls().stream().map(RedisURI::create).collect(Collectors.toList())),
@@ -73,9 +61,6 @@ public class FaultTolerantRedisCluster {
FaultTolerantRedisCluster(final String name, final RedisClusterClient clusterClient, final Duration commandTimeout, final CircuitBreakerConfiguration circuitBreakerConfiguration, final RetryConfiguration retryConfiguration) {
this.name = name;
final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
this.commandTimeoutMeter = metricRegistry.meter(name(getClass(), this.name, "commandTimeout"));
this.clusterClient = clusterClient;
this.clusterClient.setDefaultTimeout(commandTimeout);
this.clusterClient.setOptions(ClusterClientOptions.builder()
@@ -128,55 +113,28 @@ public class FaultTolerantRedisCluster {
private <K, V> void useConnection(final StatefulRedisClusterConnection<K, V> connection, final Consumer<StatefulRedisClusterConnection<K, V>> consumer) {
try {
circuitBreaker.executeCheckedRunnable(() -> retry.executeRunnable(() -> {
try {
consumer.accept(connection);
} catch (final RedisCommandTimeoutException e) {
recordCommandTimeout(e);
throw e;
}
}));
circuitBreaker.executeCheckedRunnable(() -> retry.executeRunnable(() -> consumer.accept(connection)));
} catch (final Throwable t) {
log.warn("Redis operation failure", t);
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
if (t instanceof RedisException) {
throw (RedisException) t;
} else {
throw new RuntimeException(t);
throw new RedisException(t);
}
}
}
private <T, K, V> T withConnection(final StatefulRedisClusterConnection<K, V> connection, final Function<StatefulRedisClusterConnection<K, V>, T> function) {
try {
return circuitBreaker.executeCheckedSupplier(() -> retry.executeCallable(() -> {
try {
return function.apply(connection);
} catch (final RedisCommandTimeoutException e) {
recordCommandTimeout(e);
throw e;
}
}));
return circuitBreaker.executeCheckedSupplier(() -> retry.executeCallable(() -> function.apply(connection)));
} catch (final Throwable t) {
log.warn("Redis operation failure", t);
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
if (t instanceof RedisException) {
throw (RedisException) t;
} else {
throw new RuntimeException(t);
throw new RedisException(t);
}
}
}
private void recordCommandTimeout(final RedisCommandTimeoutException e) {
commandTimeoutMeter.mark();
log.warn("[{}] Command timeout exception ({})", Thread.currentThread().getName(), this.name, e);
if (wroteThreadDump.compareAndSet(false, true)) {
ThreadDumpUtil.writeThreadDump();
}
}
public FaultTolerantPubSubConnection<String, String> createPubSubConnection() {
final StatefulRedisClusterPubSubConnection<String, String> pubSubConnection = clusterClient.connectPubSub();
pubSubConnections.add(pubSubConnection);

View File

@@ -1,13 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.redis;
public class RedisException extends Exception {
public RedisException(Exception e) {
super(e);
}
}

View File

@@ -5,6 +5,7 @@
package org.whispersystems.textsecuregcm.redis;
import io.lettuce.core.RedisException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -12,30 +13,17 @@ public class RedisOperation {
private static final Logger logger = LoggerFactory.getLogger(RedisOperation.class);
public static void unchecked(Operation operation) {
/**
* Executes the given task and logs and discards any {@link RedisException} that may be thrown. This method should be
* used for best-effort tasks like gathering metrics.
*
* @param runnable the Redis-related task to be executed
*/
public static void unchecked(final Runnable runnable) {
try {
operation.run();
runnable.run();
} catch (RedisException e) {
logger.warn("Jedis failure", e);
logger.warn("Redis failure", e);
}
}
public static boolean unchecked(BooleanOperation operation) {
try {
return operation.run();
} catch (RedisException e) {
logger.warn("Jedis failure", e);
}
return false;
}
@FunctionalInterface
public interface Operation {
public void run() throws RedisException;
}
public interface BooleanOperation {
public boolean run() throws RedisException;
}
}

View File

@@ -46,6 +46,8 @@ public class MessagePersister implements Managed {
static final int QUEUE_BATCH_LIMIT = 100;
static final int MESSAGE_BATCH_LIMIT = 100;
private static final long EXCEPTION_PAUSE_MILLIS = Duration.ofSeconds(3).toMillis();
private static final String DISABLE_PERSISTER_FEATURE_FLAG = "DISABLE_MESSAGE_PERSISTER";
private static final int WORKER_THREAD_COUNT = 4;
@@ -129,6 +131,8 @@ public class MessagePersister implements Managed {
logger.warn("Failed to persist queue {}::{}; will schedule for retry", accountUuid, deviceId, e);
messagesCache.addQueueToPersist(accountUuid, deviceId);
Util.sleep(EXCEPTION_PAUSE_MILLIS);
}
}
@@ -165,7 +169,7 @@ public class MessagePersister implements Managed {
queueSizeHistogram.update(messageCount);
} finally {
messagesCache.unlockQueueForPersistence(accountUuid, deviceId);
messagesCache.unlockQueueForPersistence(accountUuid, deviceId);
}
}
}