diff --git a/service/pom.xml b/service/pom.xml
index 496fd8252..5969580d0 100644
--- a/service/pom.xml
+++ b/service/pom.xml
@@ -445,6 +445,12 @@
netty-codec-haproxy
+
+ io.netty
+ netty-transport-native-epoll
+ linux-x86_64
+
+
org.glassfish.jersey.test-framework
jersey-test-framework-core
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClient.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClient.java
index 6b747d0cb..d57ab65f3 100644
--- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClient.java
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClient.java
@@ -69,14 +69,17 @@ public class FaultTolerantRedisClient {
redisUri.setLibraryVersion(null);
this.redisClient = RedisClient.create(clientResourcesBuilder.build(), redisUri);
- this.redisClient.setOptions(ClientOptions.builder()
+ final ClientOptions.Builder clientOptionsBuilder = ClientOptions.builder()
.disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS)
// for asynchronous commands
.timeoutOptions(TimeoutOptions.builder()
.fixedTimeout(commandTimeout)
.build())
- .publishOnScheduler(true)
- .build());
+ .publishOnScheduler(true);
+
+ NettyUtil.setSocketTimeoutsIfApplicable(clientOptionsBuilder);
+
+ this.redisClient.setOptions(clientOptionsBuilder.build());
this.stringConnection = redisClient.connect();
this.binaryConnection = redisClient.connect(ByteArrayCodec.INSTANCE);
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClient.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClient.java
index 520c88897..e0fba6ef7 100644
--- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClient.java
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClient.java
@@ -89,7 +89,8 @@ public class FaultTolerantRedisClusterClient {
clientResourcesBuilder.nettyCustomizer(lettuceShardCircuitBreaker).
build(),
redisUris);
- this.clusterClient.setOptions(ClusterClientOptions.builder()
+
+ final ClusterClientOptions.Builder clusterClientOptionsBuilder = ClusterClientOptions.builder()
.disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS)
.validateClusterNodeMembership(false)
.topologyRefreshOptions(ClusterTopologyRefreshOptions.builder()
@@ -99,8 +100,11 @@ public class FaultTolerantRedisClusterClient {
.timeoutOptions(TimeoutOptions.builder()
.fixedTimeout(commandTimeout)
.build())
- .publishOnScheduler(true)
- .build());
+ .publishOnScheduler(true);
+
+ NettyUtil.setSocketTimeoutsIfApplicable(clusterClientOptionsBuilder);
+
+ this.clusterClient.setOptions(clusterClientOptionsBuilder.build());
this.stringConnection = clusterClient.connect();
this.binaryConnection = clusterClient.connect(ByteArrayCodec.INSTANCE);
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/NettyUtil.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/NettyUtil.java
new file mode 100644
index 000000000..6e9e52764
--- /dev/null
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/NettyUtil.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2025 Signal Messenger, LLC
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+
+package org.whispersystems.textsecuregcm.redis;
+
+import io.lettuce.core.ClientOptions;
+import io.lettuce.core.SocketOptions;
+import io.lettuce.core.resource.EpollProvider;
+import java.time.Duration;
+
+public class NettyUtil {
+ static final Duration TCP_KEEPALIVE_IDLE = Duration.ofSeconds(30);
+ static final Duration TCP_KEEPALIVE_INTERVAL = Duration.ofSeconds(30);
+ static final Duration TCP_USER_TIMEOUT = Duration.ofSeconds(30);
+
+ static void setSocketTimeoutsIfApplicable(final ClientOptions.Builder clientOptionsBuilder) {
+ if (EpollProvider.isAvailable()) {
+ // These socket options are only available with epoll native transport.
+ clientOptionsBuilder.socketOptions(SocketOptions.builder()
+ .keepAlive(SocketOptions.KeepAliveOptions.builder()
+ .interval(TCP_KEEPALIVE_INTERVAL)
+ .idle(TCP_KEEPALIVE_IDLE)
+ .enable()
+ .build())
+ .tcpUserTimeout(SocketOptions.TcpUserTimeoutOptions.builder()
+ .enable()
+ .tcpUserTimeout(TCP_USER_TIMEOUT)
+ .build())
+ .build());
+ }
+ }
+}
diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClientTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClientTest.java
index 0f019851f..5d1d9cbe5 100644
--- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClientTest.java
+++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClientTest.java
@@ -332,12 +332,12 @@ class FaultTolerantRedisClusterClientTest {
final LettuceShardCircuitBreaker.ChannelCircuitBreakerHandler channelCircuitBreakerHandler =
ctx.channel().pipeline().get(LettuceShardCircuitBreaker.ChannelCircuitBreakerHandler.class);
- urisToChannelBreakers.computeIfAbsent(getRedisURI(ctx.channel()), ignored -> new HashSet<>())
+ urisToChannelBreakers.computeIfAbsent(getRedisURI(remoteAddress), ignored -> new HashSet<>())
.add(channelCircuitBreakerHandler);
}
- private static RedisURI getRedisURI(Channel channel) {
- final InetSocketAddress inetAddress = (InetSocketAddress) channel.remoteAddress();
+ private static RedisURI getRedisURI(SocketAddress remoteAddress) {
+ final InetSocketAddress inetAddress = (InetSocketAddress) remoteAddress;
return RedisURI.create(inetAddress.getHostString(), inetAddress.getPort());
}