mirror of
https://github.com/signalapp/Signal-Server
synced 2026-04-20 05:08:03 +01:00
Rename CircuitBreakerUtil to ResilienceUtil
This commit is contained in:
committed by
Jon Chambers
parent
807e03ca2b
commit
bc20aee7c9
@@ -257,7 +257,7 @@ import org.whispersystems.textsecuregcm.subscriptions.BraintreeManager;
|
||||
import org.whispersystems.textsecuregcm.subscriptions.GooglePlayBillingManager;
|
||||
import org.whispersystems.textsecuregcm.subscriptions.StripeManager;
|
||||
import org.whispersystems.textsecuregcm.util.BufferingInterceptor;
|
||||
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
|
||||
import org.whispersystems.textsecuregcm.util.ResilienceUtil;
|
||||
import org.whispersystems.textsecuregcm.util.ManagedAwsCrt;
|
||||
import org.whispersystems.textsecuregcm.util.ManagedExecutors;
|
||||
import org.whispersystems.textsecuregcm.util.SystemMapper;
|
||||
@@ -365,12 +365,12 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||
UncaughtExceptionHandler.register();
|
||||
|
||||
config.getCircuitBreakerConfigurations().forEach((name, configuration) ->
|
||||
CircuitBreakerUtil.getCircuitBreakerRegistry().addConfiguration(name, configuration.toCircuitBreakerConfig()));
|
||||
ResilienceUtil.getCircuitBreakerRegistry().addConfiguration(name, configuration.toCircuitBreakerConfig()));
|
||||
|
||||
config.getRetryConfigurations().forEach((name, configuration) ->
|
||||
CircuitBreakerUtil.getRetryRegistry().addConfiguration(name, configuration.toRetryConfigBuilder().build()));
|
||||
ResilienceUtil.getRetryRegistry().addConfiguration(name, configuration.toRetryConfigBuilder().build()));
|
||||
|
||||
CircuitBreakerUtil.setGeneralRedisRetryConfiguration(config.getGeneralRedisRetryConfiguration());
|
||||
ResilienceUtil.setGeneralRedisRetryConfiguration(config.getGeneralRedisRetryConfiguration());
|
||||
|
||||
ScheduledExecutorService dynamicConfigurationExecutor = ScheduledExecutorServiceBuilder.of(environment, "dynamicConfiguration")
|
||||
.threads(1).build();
|
||||
|
||||
@@ -35,7 +35,7 @@ import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubConnection;
|
||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClient;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
|
||||
import org.whispersystems.textsecuregcm.util.ResilienceUtil;
|
||||
import org.whispersystems.textsecuregcm.util.UUIDUtil;
|
||||
|
||||
/**
|
||||
@@ -188,7 +188,7 @@ public class DisconnectionRequestManager extends RedisPubSubAdapter<byte[], byte
|
||||
.addAllDeviceIds(deviceIds.stream().mapToInt(Byte::intValue).boxed().toList())
|
||||
.build();
|
||||
|
||||
return CircuitBreakerUtil.getGeneralRedisRetry(RETRY_NAME)
|
||||
return ResilienceUtil.getGeneralRedisRetry(RETRY_NAME)
|
||||
.executeCompletionStage(retryExecutor, () -> pubSubClient.withBinaryConnection(connection ->
|
||||
connection.async().publish(DISCONNECTION_REQUEST_CHANNEL, disconnectionRequest.toByteArray()))
|
||||
.toCompletableFuture())
|
||||
|
||||
@@ -9,7 +9,6 @@ import com.google.common.annotations.VisibleForTesting;
|
||||
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
|
||||
import io.github.resilience4j.retry.Retry;
|
||||
import io.github.resilience4j.retry.RetryConfig;
|
||||
import io.micrometer.core.instrument.Tags;
|
||||
import java.net.http.HttpClient;
|
||||
import java.net.http.HttpRequest;
|
||||
import java.net.http.HttpResponse;
|
||||
@@ -30,7 +29,7 @@ import java.util.stream.IntStream;
|
||||
import javax.annotation.Nullable;
|
||||
import org.glassfish.jersey.SslConfigurator;
|
||||
import org.whispersystems.textsecuregcm.util.CertificateUtil;
|
||||
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
|
||||
import org.whispersystems.textsecuregcm.util.ResilienceUtil;
|
||||
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
|
||||
|
||||
public class FaultTolerantHttpClient {
|
||||
@@ -211,8 +210,8 @@ public class FaultTolerantHttpClient {
|
||||
if (retryExecutor != null) {
|
||||
final RetryConfig.Builder<HttpResponse<?>> retryConfigBuilder =
|
||||
RetryConfig.from(Optional.ofNullable(retryConfigurationName)
|
||||
.flatMap(name -> CircuitBreakerUtil.getRetryRegistry().getConfiguration(name))
|
||||
.orElseGet(() -> CircuitBreakerUtil.getRetryRegistry().getDefaultConfig()));
|
||||
.flatMap(name -> ResilienceUtil.getRetryRegistry().getConfiguration(name))
|
||||
.orElseGet(() -> ResilienceUtil.getRetryRegistry().getDefaultConfig()));
|
||||
|
||||
retryConfigBuilder.retryOnResult(response -> response.statusCode() >= 500);
|
||||
|
||||
@@ -220,7 +219,7 @@ public class FaultTolerantHttpClient {
|
||||
retryConfigBuilder.retryOnException(retryOnException);
|
||||
}
|
||||
|
||||
retry = CircuitBreakerUtil.getRetryRegistry().retry(name + "-retry", retryConfigBuilder.build());
|
||||
retry = ResilienceUtil.getRetryRegistry().retry(name + "-retry", retryConfigBuilder.build());
|
||||
} else {
|
||||
retry = null;
|
||||
}
|
||||
@@ -228,8 +227,8 @@ public class FaultTolerantHttpClient {
|
||||
final String circuitBreakerName = name + "-breaker";
|
||||
|
||||
final CircuitBreaker circuitBreaker = circuitBreakerConfigurationName != null
|
||||
? CircuitBreakerUtil.getCircuitBreakerRegistry().circuitBreaker(circuitBreakerName, circuitBreakerConfigurationName)
|
||||
: CircuitBreakerUtil.getCircuitBreakerRegistry().circuitBreaker(circuitBreakerName);
|
||||
? ResilienceUtil.getCircuitBreakerRegistry().circuitBreaker(circuitBreakerName, circuitBreakerConfigurationName)
|
||||
: ResilienceUtil.getCircuitBreakerRegistry().circuitBreaker(circuitBreakerName);
|
||||
|
||||
return new FaultTolerantHttpClient(httpClients, requestTimeout, retryExecutor, retry, circuitBreaker);
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@ import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException;
|
||||
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
|
||||
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
|
||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
|
||||
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
|
||||
import org.whispersystems.textsecuregcm.util.ResilienceUtil;
|
||||
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
|
||||
import org.whispersystems.textsecuregcm.util.Util;
|
||||
|
||||
@@ -135,13 +135,13 @@ public class DynamicRateLimiter implements RateLimiter {
|
||||
|
||||
@Override
|
||||
public void clear(final String key) {
|
||||
CircuitBreakerUtil.getGeneralRedisRetry(RETRY_NAME)
|
||||
ResilienceUtil.getGeneralRedisRetry(RETRY_NAME)
|
||||
.executeRunnable(() -> cluster.useCluster(connection -> connection.sync().del(bucketName(name, key))));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletionStage<Void> clearAsync(final String key) {
|
||||
return CircuitBreakerUtil.getGeneralRedisRetry(RETRY_NAME)
|
||||
return ResilienceUtil.getGeneralRedisRetry(RETRY_NAME)
|
||||
.executeCompletionStage(retryExecutor, () -> cluster.withCluster(connection -> connection.async().del(bucketName(name, key)))
|
||||
.thenRun(Util.NOOP));
|
||||
}
|
||||
|
||||
@@ -23,7 +23,7 @@ import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubConnection;
|
||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClient;
|
||||
import org.whispersystems.textsecuregcm.redis.RedisOperation;
|
||||
import org.whispersystems.textsecuregcm.storage.PubSubProtos;
|
||||
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
|
||||
import org.whispersystems.textsecuregcm.util.ResilienceUtil;
|
||||
|
||||
public class ProvisioningManager extends RedisPubSubAdapter<byte[], byte[]> implements Managed {
|
||||
|
||||
@@ -80,7 +80,7 @@ public class ProvisioningManager extends RedisPubSubAdapter<byte[], byte[]> impl
|
||||
.setContent(ByteString.copyFrom(body))
|
||||
.build();
|
||||
|
||||
final boolean receiverPresent = CircuitBreakerUtil.getGeneralRedisRetry(RETRY_NAME)
|
||||
final boolean receiverPresent = ResilienceUtil.getGeneralRedisRetry(RETRY_NAME)
|
||||
.executeSupplier(() -> pubSubClient.withBinaryConnection(connection ->
|
||||
connection.sync().publish(address.getBytes(StandardCharsets.UTF_8), pubSubMessage.toByteArray()) > 0));
|
||||
|
||||
|
||||
@@ -35,7 +35,7 @@ import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
|
||||
import org.whispersystems.textsecuregcm.util.ResilienceUtil;
|
||||
import org.whispersystems.textsecuregcm.util.Pair;
|
||||
import org.whispersystems.textsecuregcm.util.RedisClusterUtil;
|
||||
import org.whispersystems.textsecuregcm.util.Util;
|
||||
@@ -236,7 +236,7 @@ public class PushNotificationScheduler implements Managed {
|
||||
public CompletableFuture<Void> scheduleDelayedNotification(final Account account, final Device device, final Duration minDelay) {
|
||||
final long deliveryTime = clock.instant().plus(minDelay).toEpochMilli();
|
||||
|
||||
return CircuitBreakerUtil.getGeneralRedisRetry(RETRY_NAME)
|
||||
return ResilienceUtil.getGeneralRedisRetry(RETRY_NAME)
|
||||
.executeCompletionStage(retryExecutor, () -> pushSchedulingCluster.withCluster(connection ->
|
||||
connection.async().zadd(getDelayedNotificationQueueKey(account, device),
|
||||
deliveryTime,
|
||||
|
||||
@@ -17,7 +17,7 @@ import java.util.List;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import org.whispersystems.textsecuregcm.configuration.RedisConfiguration;
|
||||
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
|
||||
import org.whispersystems.textsecuregcm.util.ResilienceUtil;
|
||||
|
||||
public class FaultTolerantRedisClient {
|
||||
|
||||
@@ -40,8 +40,8 @@ public class FaultTolerantRedisClient {
|
||||
RedisUriUtil.createRedisUriWithTimeout(redisConfiguration.getUri(), redisConfiguration.getTimeout()),
|
||||
redisConfiguration.getTimeout(),
|
||||
redisConfiguration.getCircuitBreakerConfigurationName() != null
|
||||
? CircuitBreakerUtil.getCircuitBreakerRegistry().circuitBreaker(name + "-breaker", redisConfiguration.getCircuitBreakerConfigurationName())
|
||||
: CircuitBreakerUtil.getCircuitBreakerRegistry().circuitBreaker(name + "-breaker"));
|
||||
? ResilienceUtil.getCircuitBreakerRegistry().circuitBreaker(name + "-breaker", redisConfiguration.getCircuitBreakerConfigurationName())
|
||||
: ResilienceUtil.getCircuitBreakerRegistry().circuitBreaker(name + "-breaker"));
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
||||
@@ -27,7 +27,7 @@ import javax.annotation.Nullable;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
|
||||
import org.whispersystems.textsecuregcm.util.ResilienceUtil;
|
||||
|
||||
/**
|
||||
* Adds a circuit breaker to every Netty {@link Channel} that gets created, so that a single unhealthy shard does not
|
||||
@@ -105,8 +105,8 @@ public class LettuceShardCircuitBreaker implements NettyCustomizer {
|
||||
SHARD_ADDRESS_TAG_NAME, shardAddress);
|
||||
|
||||
breaker = circuitBreakerConfigurationName != null
|
||||
? CircuitBreakerUtil.getCircuitBreakerRegistry().circuitBreaker(circuitBreakerName, circuitBreakerConfigurationName, tags)
|
||||
: CircuitBreakerUtil.getCircuitBreakerRegistry().circuitBreaker(circuitBreakerName, tags);
|
||||
? ResilienceUtil.getCircuitBreakerRegistry().circuitBreaker(circuitBreakerName, circuitBreakerConfigurationName, tags)
|
||||
: ResilienceUtil.getCircuitBreakerRegistry().circuitBreaker(circuitBreakerName, tags);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -85,7 +85,7 @@ import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClient;
|
||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
|
||||
import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient;
|
||||
import org.whispersystems.textsecuregcm.securevaluerecovery.SecureValueRecoveryClient;
|
||||
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
|
||||
import org.whispersystems.textsecuregcm.util.ResilienceUtil;
|
||||
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
|
||||
import org.whispersystems.textsecuregcm.util.Pair;
|
||||
import org.whispersystems.textsecuregcm.util.RegistrationIdValidator;
|
||||
@@ -522,7 +522,7 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
|
||||
CircuitBreakerUtil.getGeneralRedisRetry(RETRY_NAME)
|
||||
ResilienceUtil.getGeneralRedisRetry(RETRY_NAME)
|
||||
.executeCompletionStage(retryExecutor, () -> pubSubRedisClient.withConnection(connection ->
|
||||
connection.async().set(key, deviceInfoJson, SetArgs.Builder.ex(RECENTLY_ADDED_DEVICE_TTL))))
|
||||
.whenComplete((_, pubSubThrowable) -> {
|
||||
@@ -1427,7 +1427,7 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
|
||||
}
|
||||
|
||||
private void redisDelete(final Account account) {
|
||||
CircuitBreakerUtil.getGeneralRedisRetry(RETRY_NAME).executeRunnable(() ->
|
||||
ResilienceUtil.getGeneralRedisRetry(RETRY_NAME).executeRunnable(() ->
|
||||
redisDeleteTimer.record(() ->
|
||||
cacheCluster.useCluster(connection ->
|
||||
connection.sync().del(getAccountMapKey(account.getPhoneNumberIdentifier().toString()),
|
||||
@@ -1442,7 +1442,7 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
|
||||
getAccountEntityKey(account.getUuid())
|
||||
};
|
||||
|
||||
return CircuitBreakerUtil.getGeneralRedisRetry(RETRY_NAME).executeCompletionStage(retryExecutor,
|
||||
return ResilienceUtil.getGeneralRedisRetry(RETRY_NAME).executeCompletionStage(retryExecutor,
|
||||
() -> cacheCluster.withCluster(connection -> connection.async().del(keysToDelete))
|
||||
.thenRun(Util.NOOP))
|
||||
.toCompletableFuture()
|
||||
@@ -1601,7 +1601,7 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
|
||||
// We validate the request object so this should never happen
|
||||
.orElseThrow(() -> new AssertionError("No creation timestamp or registration ID provided")));
|
||||
|
||||
return CircuitBreakerUtil.getGeneralRedisRetry(RETRY_NAME)
|
||||
return ResilienceUtil.getGeneralRedisRetry(RETRY_NAME)
|
||||
.executeCompletionStage(retryExecutor, () -> pubSubRedisClient.withConnection(connection -> connection.async()
|
||||
.set(key, transferArchiveJson, SetArgs.Builder.ex(RECENTLY_ADDED_TRANSFER_ARCHIVE_TTL)))
|
||||
.toCompletableFuture())
|
||||
@@ -1662,7 +1662,7 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
|
||||
return CircuitBreakerUtil.getGeneralRedisRetry(RETRY_NAME)
|
||||
return ResilienceUtil.getGeneralRedisRetry(RETRY_NAME)
|
||||
.executeCompletionStage(retryExecutor, () -> pubSubRedisClient.withConnection(connection ->
|
||||
connection.async().set(key, requestJson, SetArgs.Builder.ex(RESTORE_ACCOUNT_REQUEST_TTL)))
|
||||
.toCompletableFuture())
|
||||
|
||||
@@ -48,7 +48,7 @@ import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
|
||||
import org.whispersystems.textsecuregcm.identity.ServiceIdentifier;
|
||||
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
|
||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
|
||||
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
|
||||
import org.whispersystems.textsecuregcm.util.ResilienceUtil;
|
||||
import org.whispersystems.textsecuregcm.util.Pair;
|
||||
import org.whispersystems.textsecuregcm.util.RedisClusterUtil;
|
||||
import reactor.core.observability.micrometer.Micrometer;
|
||||
@@ -440,7 +440,7 @@ public class MessagesCache {
|
||||
conn -> conn.reactive().hmget(key, "data".getBytes(StandardCharsets.UTF_8), sharedMrmViewKey)
|
||||
.collectList()
|
||||
.publishOn(messageDeliveryScheduler)))
|
||||
.transformDeferred(RetryOperator.of(CircuitBreakerUtil.getGeneralRedisRetry(RETRY_NAME)))
|
||||
.transformDeferred(RetryOperator.of(ResilienceUtil.getGeneralRedisRetry(RETRY_NAME)))
|
||||
.<MessageProtos.Envelope>handle((mrmDataAndView, sink) -> {
|
||||
try {
|
||||
assert mrmDataAndView.size() == 2;
|
||||
|
||||
@@ -13,7 +13,7 @@ import java.util.List;
|
||||
import java.util.UUID;
|
||||
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
|
||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
|
||||
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
|
||||
import org.whispersystems.textsecuregcm.util.ResilienceUtil;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
/**
|
||||
@@ -40,7 +40,7 @@ class MessagesCacheGetItemsScript {
|
||||
);
|
||||
//noinspection unchecked
|
||||
return getItemsScript.executeBinaryReactive(keys, args)
|
||||
.transformDeferred(RetryOperator.of(CircuitBreakerUtil.getGeneralRedisRetry(MessagesCache.RETRY_NAME)))
|
||||
.transformDeferred(RetryOperator.of(ResilienceUtil.getGeneralRedisRetry(MessagesCache.RETRY_NAME)))
|
||||
.map(result -> (List<byte[]>) result)
|
||||
.next();
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@ import org.whispersystems.textsecuregcm.push.NewMessageAvailableEvent;
|
||||
import org.whispersystems.textsecuregcm.push.RedisMessageAvailabilityManager;
|
||||
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
|
||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
|
||||
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
|
||||
import org.whispersystems.textsecuregcm.util.ResilienceUtil;
|
||||
|
||||
/**
|
||||
* Inserts an envelope into the message queue for a destination device and publishes a "new message available" event.
|
||||
@@ -69,7 +69,7 @@ class MessagesCacheInsertScript {
|
||||
NEW_MESSAGE_EVENT_BYTES // eventPayload
|
||||
));
|
||||
|
||||
return CircuitBreakerUtil.getGeneralRedisRetry(MessagesCache.RETRY_NAME)
|
||||
return ResilienceUtil.getGeneralRedisRetry(MessagesCache.RETRY_NAME)
|
||||
.executeCompletionStage(retryExecutor, () -> insertScript.executeBinaryAsync(keys, args))
|
||||
.thenApply(result -> (boolean) result);
|
||||
}
|
||||
|
||||
@@ -14,7 +14,7 @@ import java.util.concurrent.ScheduledExecutorService;
|
||||
import org.signal.libsignal.protocol.SealedSenderMultiRecipientMessage;
|
||||
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
|
||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
|
||||
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
|
||||
import org.whispersystems.textsecuregcm.util.ResilienceUtil;
|
||||
import org.whispersystems.textsecuregcm.util.Util;
|
||||
|
||||
/**
|
||||
@@ -55,7 +55,7 @@ class MessagesCacheInsertSharedMultiRecipientPayloadAndViewsScript {
|
||||
}
|
||||
});
|
||||
|
||||
return CircuitBreakerUtil.getGeneralRedisRetry(MessagesCache.RETRY_NAME)
|
||||
return ResilienceUtil.getGeneralRedisRetry(MessagesCache.RETRY_NAME)
|
||||
.executeCompletionStage(retryExecutor, () -> script.executeBinaryAsync(keys, args))
|
||||
.thenRun(Util.NOOP);
|
||||
}
|
||||
|
||||
@@ -14,7 +14,7 @@ import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
|
||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
|
||||
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
|
||||
import org.whispersystems.textsecuregcm.util.ResilienceUtil;
|
||||
|
||||
/**
|
||||
* Removes a list of message GUIDs from the queue of a destination device.
|
||||
@@ -44,7 +44,7 @@ class MessagesCacheRemoveByGuidScript {
|
||||
.toList();
|
||||
|
||||
//noinspection unchecked
|
||||
return CircuitBreakerUtil.getGeneralRedisRetry(MessagesCache.RETRY_NAME)
|
||||
return ResilienceUtil.getGeneralRedisRetry(MessagesCache.RETRY_NAME)
|
||||
.executeCompletionStage(retryExecutor, () -> removeByGuidScript.executeBinaryAsync(keys, args))
|
||||
.thenApply(result -> (List<byte[]>) result);
|
||||
}
|
||||
|
||||
@@ -11,7 +11,7 @@ import org.whispersystems.textsecuregcm.push.MessagesPersistedEvent;
|
||||
import org.whispersystems.textsecuregcm.push.RedisMessageAvailabilityManager;
|
||||
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
|
||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
|
||||
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
|
||||
import org.whispersystems.textsecuregcm.util.ResilienceUtil;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
@@ -39,7 +39,7 @@ class MessagesCacheUnlockQueueScript {
|
||||
RedisMessageAvailabilityManager.getClientEventChannel(accountIdentifier, deviceId) // eventChannelKey
|
||||
);
|
||||
|
||||
CircuitBreakerUtil.getGeneralRedisRetry(MessagesCache.RETRY_NAME)
|
||||
ResilienceUtil.getGeneralRedisRetry(MessagesCache.RETRY_NAME)
|
||||
.executeRunnable(() -> unlockQueueScript.executeBinary(keys, MESSAGES_PERSISTED_EVENT_ARGS));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,7 +22,7 @@ import io.micrometer.core.instrument.Metrics;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
|
||||
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
|
||||
import org.whispersystems.textsecuregcm.util.ResilienceUtil;
|
||||
import org.whispersystems.textsecuregcm.util.SystemMapper;
|
||||
import org.whispersystems.textsecuregcm.util.Util;
|
||||
import reactor.core.publisher.Mono;
|
||||
@@ -190,7 +190,7 @@ public class ProfilesManager {
|
||||
}
|
||||
|
||||
private CompletableFuture<Void> redisDelete(UUID uuid) {
|
||||
return CircuitBreakerUtil.getGeneralRedisRetry(RETRY_NAME)
|
||||
return ResilienceUtil.getGeneralRedisRetry(RETRY_NAME)
|
||||
.executeCompletionStage(retryExecutor,
|
||||
() -> cacheCluster.withCluster(connection -> connection.async().del(getCacheKey(uuid))))
|
||||
.toCompletableFuture()
|
||||
|
||||
@@ -34,7 +34,7 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
|
||||
import org.whispersystems.textsecuregcm.util.ResilienceUtil;
|
||||
|
||||
/**
|
||||
* Register Apple DeviceCheck App Attestations and verify the corresponding assertions.
|
||||
@@ -117,7 +117,7 @@ public class AppleDeviceCheckManager {
|
||||
|
||||
final String redisChallengeKey = challengeKey(ChallengeType.ATTEST, account.getUuid());
|
||||
|
||||
@Nullable final String challenge = CircuitBreakerUtil.getGeneralRedisRetry(RETRY_NAME)
|
||||
@Nullable final String challenge = ResilienceUtil.getGeneralRedisRetry(RETRY_NAME)
|
||||
.executeSupplier(() -> redisClient.withCluster(cluster -> cluster.sync().get(redisChallengeKey)));
|
||||
|
||||
if (challenge == null) {
|
||||
@@ -176,7 +176,7 @@ public class AppleDeviceCheckManager {
|
||||
throws ChallengeNotFoundException, DeviceCheckVerificationFailedException, DeviceCheckKeyIdNotFoundException, RequestReuseException {
|
||||
|
||||
final String redisChallengeKey = challengeKey(challengeType, account.getUuid());
|
||||
@Nullable final String storedChallenge = CircuitBreakerUtil.getGeneralRedisRetry(RETRY_NAME)
|
||||
@Nullable final String storedChallenge = ResilienceUtil.getGeneralRedisRetry(RETRY_NAME)
|
||||
.executeSupplier(() -> redisClient.withCluster(cluster -> cluster.sync().get(redisChallengeKey)));
|
||||
|
||||
if (storedChallenge == null) {
|
||||
@@ -221,7 +221,7 @@ public class AppleDeviceCheckManager {
|
||||
final UUID accountIdentifier = account.getUuid();
|
||||
|
||||
final String challengeKey = challengeKey(challengeType, accountIdentifier);
|
||||
return CircuitBreakerUtil.getGeneralRedisRetry(RETRY_NAME)
|
||||
return ResilienceUtil.getGeneralRedisRetry(RETRY_NAME)
|
||||
.executeSupplier(() -> redisClient.withCluster(cluster -> {
|
||||
final RedisAdvancedClusterCommands<String, String> commands = cluster.sync();
|
||||
|
||||
|
||||
@@ -47,7 +47,7 @@ import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException;
|
||||
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
|
||||
import org.whispersystems.textsecuregcm.storage.PaymentTime;
|
||||
import org.whispersystems.textsecuregcm.storage.SubscriptionException;
|
||||
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
|
||||
import org.whispersystems.textsecuregcm.util.ResilienceUtil;
|
||||
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
|
||||
|
||||
/**
|
||||
@@ -112,12 +112,12 @@ public class AppleAppStoreManager implements SubscriptionPaymentProcessor {
|
||||
|
||||
final RetryConfig.Builder<HttpResponse<?>> retryConfigBuilder =
|
||||
RetryConfig.from(Optional.ofNullable(retryConfigurationName)
|
||||
.flatMap(name -> CircuitBreakerUtil.getRetryRegistry().getConfiguration(name))
|
||||
.orElseGet(() -> CircuitBreakerUtil.getRetryRegistry().getDefaultConfig()));
|
||||
.flatMap(name -> ResilienceUtil.getRetryRegistry().getConfiguration(name))
|
||||
.orElseGet(() -> ResilienceUtil.getRetryRegistry().getDefaultConfig()));
|
||||
|
||||
retryConfigBuilder.retryOnException(AppleAppStoreManager::shouldRetry);
|
||||
|
||||
this.retry = CircuitBreakerUtil.getRetryRegistry().retry("appstore-retry", retryConfigBuilder.build());
|
||||
this.retry = ResilienceUtil.getRetryRegistry().retry("appstore-retry", retryConfigBuilder.build());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -16,7 +16,7 @@ import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
|
||||
|
||||
public class CircuitBreakerUtil {
|
||||
public class ResilienceUtil {
|
||||
|
||||
private static final CircuitBreakerRegistry CIRCUIT_BREAKER_REGISTRY =
|
||||
CircuitBreakerRegistry.of(new CircuitBreakerConfiguration().toCircuitBreakerConfig());
|
||||
Reference in New Issue
Block a user