From b2dd3151774dcb5309c6f33b62f63bf52412e7bb Mon Sep 17 00:00:00 2001 From: Katherine Date: Mon, 8 Sep 2025 12:37:03 -0400 Subject: [PATCH] Set TCP timeout on Redis clients --- service/pom.xml | 6 ++++ .../redis/FaultTolerantRedisClient.java | 9 +++-- .../FaultTolerantRedisClusterClient.java | 10 ++++-- .../textsecuregcm/redis/NettyUtil.java | 34 +++++++++++++++++++ .../FaultTolerantRedisClusterClientTest.java | 6 ++-- 5 files changed, 56 insertions(+), 9 deletions(-) create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/redis/NettyUtil.java diff --git a/service/pom.xml b/service/pom.xml index 496fd8252..5969580d0 100644 --- a/service/pom.xml +++ b/service/pom.xml @@ -445,6 +445,12 @@ netty-codec-haproxy + + io.netty + netty-transport-native-epoll + linux-x86_64 + + org.glassfish.jersey.test-framework jersey-test-framework-core diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClient.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClient.java index 6b747d0cb..d57ab65f3 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClient.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClient.java @@ -69,14 +69,17 @@ public class FaultTolerantRedisClient { redisUri.setLibraryVersion(null); this.redisClient = RedisClient.create(clientResourcesBuilder.build(), redisUri); - this.redisClient.setOptions(ClientOptions.builder() + final ClientOptions.Builder clientOptionsBuilder = ClientOptions.builder() .disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS) // for asynchronous commands .timeoutOptions(TimeoutOptions.builder() .fixedTimeout(commandTimeout) .build()) - .publishOnScheduler(true) - .build()); + .publishOnScheduler(true); + + NettyUtil.setSocketTimeoutsIfApplicable(clientOptionsBuilder); + + this.redisClient.setOptions(clientOptionsBuilder.build()); this.stringConnection = redisClient.connect(); this.binaryConnection = redisClient.connect(ByteArrayCodec.INSTANCE); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClient.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClient.java index 520c88897..e0fba6ef7 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClient.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClient.java @@ -89,7 +89,8 @@ public class FaultTolerantRedisClusterClient { clientResourcesBuilder.nettyCustomizer(lettuceShardCircuitBreaker). build(), redisUris); - this.clusterClient.setOptions(ClusterClientOptions.builder() + + final ClusterClientOptions.Builder clusterClientOptionsBuilder = ClusterClientOptions.builder() .disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS) .validateClusterNodeMembership(false) .topologyRefreshOptions(ClusterTopologyRefreshOptions.builder() @@ -99,8 +100,11 @@ public class FaultTolerantRedisClusterClient { .timeoutOptions(TimeoutOptions.builder() .fixedTimeout(commandTimeout) .build()) - .publishOnScheduler(true) - .build()); + .publishOnScheduler(true); + + NettyUtil.setSocketTimeoutsIfApplicable(clusterClientOptionsBuilder); + + this.clusterClient.setOptions(clusterClientOptionsBuilder.build()); this.stringConnection = clusterClient.connect(); this.binaryConnection = clusterClient.connect(ByteArrayCodec.INSTANCE); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/NettyUtil.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/NettyUtil.java new file mode 100644 index 000000000..6e9e52764 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/NettyUtil.java @@ -0,0 +1,34 @@ +/* + * Copyright 2025 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.redis; + +import io.lettuce.core.ClientOptions; +import io.lettuce.core.SocketOptions; +import io.lettuce.core.resource.EpollProvider; +import java.time.Duration; + +public class NettyUtil { + static final Duration TCP_KEEPALIVE_IDLE = Duration.ofSeconds(30); + static final Duration TCP_KEEPALIVE_INTERVAL = Duration.ofSeconds(30); + static final Duration TCP_USER_TIMEOUT = Duration.ofSeconds(30); + + static void setSocketTimeoutsIfApplicable(final ClientOptions.Builder clientOptionsBuilder) { + if (EpollProvider.isAvailable()) { + // These socket options are only available with epoll native transport. + clientOptionsBuilder.socketOptions(SocketOptions.builder() + .keepAlive(SocketOptions.KeepAliveOptions.builder() + .interval(TCP_KEEPALIVE_INTERVAL) + .idle(TCP_KEEPALIVE_IDLE) + .enable() + .build()) + .tcpUserTimeout(SocketOptions.TcpUserTimeoutOptions.builder() + .enable() + .tcpUserTimeout(TCP_USER_TIMEOUT) + .build()) + .build()); + } + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClientTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClientTest.java index 0f019851f..5d1d9cbe5 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClientTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClientTest.java @@ -332,12 +332,12 @@ class FaultTolerantRedisClusterClientTest { final LettuceShardCircuitBreaker.ChannelCircuitBreakerHandler channelCircuitBreakerHandler = ctx.channel().pipeline().get(LettuceShardCircuitBreaker.ChannelCircuitBreakerHandler.class); - urisToChannelBreakers.computeIfAbsent(getRedisURI(ctx.channel()), ignored -> new HashSet<>()) + urisToChannelBreakers.computeIfAbsent(getRedisURI(remoteAddress), ignored -> new HashSet<>()) .add(channelCircuitBreakerHandler); } - private static RedisURI getRedisURI(Channel channel) { - final InetSocketAddress inetAddress = (InetSocketAddress) channel.remoteAddress(); + private static RedisURI getRedisURI(SocketAddress remoteAddress) { + final InetSocketAddress inetAddress = (InetSocketAddress) remoteAddress; return RedisURI.create(inetAddress.getHostString(), inetAddress.getPort()); }