diff --git a/pom.xml b/pom.xml
index 6d4ee2c4c..0b1c6d688 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,13 +70,13 @@
2.3.20
1.5.32
2.0.12
- 6.8.2.RELEASE
+ 7.5.1.RELEASE
9.0.21
8.1
2.25.4
3.5.0
1.16.4
- 4.1.127.Final
+ 4.2.13.Final
4.33.2
diff --git a/service/config/sample.yml b/service/config/sample.yml
index 10818ef58..9ceaafc61 100644
--- a/service/config/sample.yml
+++ b/service/config/sample.yml
@@ -517,6 +517,7 @@ idlePrimaryDeviceReminder:
grpc:
port: 50051
+ websocketPort: 8080
asnTable:
s3Region: a-region
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java
index 2c6b2654d..30a969b12 100644
--- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java
@@ -31,16 +31,23 @@ import io.lettuce.core.metrics.MicrometerOptions;
import io.lettuce.core.resource.ClientResources;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
+import io.netty.channel.DefaultEventLoopGroup;
+import io.netty.channel.local.LocalAddress;
+import io.netty.channel.local.LocalServerChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.ssl.SslContext;
import io.netty.resolver.ResolvedAddressTypes;
import io.netty.resolver.dns.DnsNameResolver;
import io.netty.resolver.dns.DnsNameResolverBuilder;
+import io.netty.util.Mapping;
import jakarta.servlet.DispatcherType;
import jakarta.servlet.Filter;
import jakarta.servlet.ServletRegistration;
import java.io.ByteArrayInputStream;
import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.net.http.HttpClient;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
@@ -164,7 +171,10 @@ import org.whispersystems.textsecuregcm.grpc.ProfileGrpcService;
import org.whispersystems.textsecuregcm.grpc.RequestAttributesInterceptor;
import org.whispersystems.textsecuregcm.grpc.ValidatingInterceptor;
import org.whispersystems.textsecuregcm.grpc.net.ManagedGrpcServer;
-import org.whispersystems.textsecuregcm.grpc.net.ManagedNioEventLoopGroup;
+import org.whispersystems.textsecuregcm.grpc.net.ManagedEventLoopGroup;
+import org.whispersystems.textsecuregcm.grpc.net.OmnibusH2Server;
+import org.whispersystems.textsecuregcm.grpc.net.OmnibusRouter;
+import org.whispersystems.textsecuregcm.grpc.net.SniMapper;
import org.whispersystems.textsecuregcm.jetty.JettyHttpConfigurationCustomizer;
import org.whispersystems.textsecuregcm.keytransparency.KeyTransparencyServiceClient;
import org.whispersystems.textsecuregcm.limits.CardinalityEstimator;
@@ -314,6 +324,7 @@ import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.s3.S3AsyncClient;
+import javax.annotation.Nullable;
public class WhisperServerService extends Application {
@@ -619,8 +630,8 @@ public class WhisperServerService extends Application dnsResolutionEventLoopGroup = new ManagedEventLoopGroup<>(new NioEventLoopGroup());
+ final DnsNameResolver cloudflareDnsResolver = new DnsNameResolverBuilder(dnsResolutionEventLoopGroup.getEventLoopGroup().next())
.resolvedAddressTypes(ResolvedAddressTypes.IPV6_PREFERRED)
.completeOncePreferredResolved(false)
.channelType(NioDatagramChannel.class)
@@ -1013,13 +1024,38 @@ public class WhisperServerService extends Application serverBuilder =
- NettyServerBuilder.forAddress(new InetSocketAddress(config.getGrpc().bindAddress(), config.getGrpc().port()));
+ final ManagedEventLoopGroup omnibusLocalEventLoopGroup = new ManagedEventLoopGroup<>(new DefaultEventLoopGroup());
+ final ManagedEventLoopGroup omnibusNioEventLoopGroup = new ManagedEventLoopGroup<>(new NioEventLoopGroup());
+ final LocalAddress grpcLocalAddress = new LocalAddress("grpc");
+ final ServerBuilder> serverBuilder = NettyServerBuilder
+ .forAddress(grpcLocalAddress)
+ .channelType(LocalServerChannel.class)
+ .bossEventLoopGroup(omnibusLocalEventLoopGroup.getEventLoopGroup())
+ .workerEventLoopGroup(omnibusLocalEventLoopGroup.getEventLoopGroup());
authenticatedServices.forEach(serverBuilder::addService);
unauthenticatedServices.forEach(serverBuilder::addService);
- final ManagedGrpcServer exposedGrpcServer = new ManagedGrpcServer(serverBuilder.build());
+ final ManagedGrpcServer localGrpcServer = new ManagedGrpcServer(serverBuilder.build());
- environment.lifecycle().manage(exposedGrpcServer);
+ final SocketAddress websocketAddress =
+ new InetSocketAddress(config.getGrpc().websocketAddress(), config.getGrpc().websocketPort());
+ final OmnibusRouter omnibusRouter = new OmnibusRouter(List.of(
+ new OmnibusRouter.OmnibusRoute("/v1/websocket", websocketAddress),
+ new OmnibusRouter.OmnibusRoute("/v1/provisioning", websocketAddress)),
+ grpcLocalAddress);
+ @Nullable final Mapping sniMapping = config.getGrpc().h2c()
+ ? null
+ : SniMapper.buildSniMapping(config.getTlsKeyStoreConfiguration().path(), config.getTlsKeyStoreConfiguration().password().value());
+ final OmnibusH2Server omnibusH2Server = new OmnibusH2Server(
+ sniMapping,
+ omnibusNioEventLoopGroup.getEventLoopGroup(),
+ omnibusLocalEventLoopGroup.getEventLoopGroup(),
+ new InetSocketAddress(config.getGrpc().bindAddress(), config.getGrpc().port()), omnibusRouter,
+ config.getGrpc().idleTimeout());
+
+ environment.lifecycle().manage(omnibusLocalEventLoopGroup);
+ environment.lifecycle().manage(omnibusNioEventLoopGroup);
+ environment.lifecycle().manage(localGrpcServer);
+ environment.lifecycle().manage(omnibusH2Server);
final List filters = new ArrayList<>();
filters.add(remoteDeprecationFilter);
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/GrpcConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/GrpcConfiguration.java
index fb3c08742..295879b8f 100644
--- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/GrpcConfiguration.java
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/GrpcConfiguration.java
@@ -5,11 +5,33 @@
package org.whispersystems.textsecuregcm.configuration;
import jakarta.validation.constraints.NotNull;
+import java.time.Duration;
+
+/// Configuration for the gRPC Server
+///
+/// @param bindAddress The host to bind the omnibus server to
+/// @param port The port to bind the omnibus server to
+/// @param websocketAddress The address of a listening websocket server for handling legacy requests
+/// @param websocketPort The port of a listening websocket server for handling legacy requests
+/// @param idleTimeout The duration after which an idle connection may be disconnected
+/// @param h2c If true, listen for plaintext h2c with prior-knowledge
+public record GrpcConfiguration(
+ @NotNull String bindAddress,
+ @NotNull Integer port,
+ @NotNull String websocketAddress,
+ @NotNull Integer websocketPort,
+ @NotNull Duration idleTimeout,
+ boolean h2c) {
-public record GrpcConfiguration(@NotNull String bindAddress, @NotNull Integer port) {
public GrpcConfiguration {
if (bindAddress == null || bindAddress.isEmpty()) {
bindAddress = "localhost";
}
+ if (websocketAddress == null || websocketAddress.isEmpty()) {
+ websocketAddress = "localhost";
+ }
+ if (idleTimeout == null) {
+ idleTimeout = Duration.ofMinutes(5);
+ }
}
}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/TlsKeyStoreConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/TlsKeyStoreConfiguration.java
index 1bd2c1d68..c38912de8 100644
--- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/TlsKeyStoreConfiguration.java
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/TlsKeyStoreConfiguration.java
@@ -7,6 +7,9 @@ package org.whispersystems.textsecuregcm.configuration;
import jakarta.validation.constraints.NotNull;
import org.whispersystems.textsecuregcm.configuration.secrets.SecretString;
+import javax.annotation.Nullable;
-public record TlsKeyStoreConfiguration(@NotNull SecretString password) {
+public record TlsKeyStoreConfiguration(
+ @Nullable String path,
+ @NotNull SecretString password) {
}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/RequestAttributesInterceptor.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/RequestAttributesInterceptor.java
index eeeab6684..45fd52943 100644
--- a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/RequestAttributesInterceptor.java
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/RequestAttributesInterceptor.java
@@ -48,6 +48,8 @@ public class RequestAttributesInterceptor implements ServerInterceptor {
final String acceptLanguageHeader = headers.get(ACCEPT_LANG_KEY);
final String xForwardedForHeader = headers.get(X_FORWARDED_FOR_KEY);
+ // This assumes that X-Forwarded-For has been set by a trusted intermediate proxy. For example, this may be set by
+ // OmnibusH2Server which itself sets X-Forwarded-For using a PPv2 header that comes from a trusted load-balancer.
final Optional remoteAddress = getMostRecentProxy(xForwardedForHeader)
.flatMap(mostRecentProxy -> {
try {
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/H2FrameProxyHandler.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/H2FrameProxyHandler.java
new file mode 100644
index 000000000..e4167a4d4
--- /dev/null
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/H2FrameProxyHandler.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2026 Signal Messenger, LLC
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+package org.whispersystems.textsecuregcm.grpc.net;
+
+import io.micrometer.core.instrument.Metrics;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.http2.Http2StreamFrame;
+import io.netty.util.ReferenceCountUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
+
+/// Writes all inbound H2 frames to [this#peerStream], renumbering the inbound H2 stream-id for peer H2 stream
+public class H2FrameProxyHandler extends ChannelInboundHandlerAdapter {
+
+ private static final Logger logger = LoggerFactory.getLogger(H2FrameProxyHandler.class);
+ private static final String WRITABILITY_CHANGED_COUNTER_NAME = MetricsUtil.name(H2FrameProxyHandler.class, "writabilityChanged");
+
+ private final Channel peerStream;
+ private final String proxyNameTag;
+
+ // If we fail to write to the peerStream, we want to close the inbound channel. Rather than allocate a new listener
+ // that captures the inbound ChannelHandlerContext on every message, we capture the ChannelHandlerContext in
+ // handlerAdded and use it on all forwarded writes. This would not work if we attached this handler to more than
+ // one channel, but we already have a designated peerStream so this handler is fundamentally single-channel.
+ private ChannelFutureListener closeInboundOnPeerFailure = null;
+
+ public H2FrameProxyHandler(final Channel peerStream, final String proxyNameTag) {
+ this.peerStream = peerStream;
+ this.proxyNameTag = proxyNameTag;
+ }
+
+ @Override
+ public void handlerAdded(final ChannelHandlerContext ctx) {
+ closeInboundOnPeerFailure = f -> {
+ if (!f.isSuccess()) {
+ ctx.close();
+ }
+ };
+
+ // When the peer stream we are forwarding to becomes unwritable/writable, stop/start reading from the source stream.
+ // This prevents us from reading from the source stream as fast as we can just to buffer requests for the peer
+ // stream.
+ peerStream.pipeline().addLast(new ChannelInboundHandlerAdapter() {
+ @Override
+ public void channelWritabilityChanged(final ChannelHandlerContext peerCtx) throws Exception {
+ Metrics.counter(WRITABILITY_CHANGED_COUNTER_NAME,
+ "isWritable", Boolean.toString(peerCtx.channel().isWritable()),
+ "proxy", proxyNameTag)
+ .increment();
+ ctx.channel().config().setAutoRead(peerStream.isWritable());
+ super.channelWritabilityChanged(peerCtx);
+ }
+ });
+ }
+
+ @Override
+ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
+ logger.trace("Received frame {}", msg);
+ if (!(msg instanceof Http2StreamFrame streamFrame)) {
+ logger.error("Received unexpected frame {}", msg);
+ ReferenceCountUtil.release(msg);
+ ctx.close();
+ return;
+ }
+
+ // Clear the stream-id on this frame, so netty will associate it with the peerStream's stream-id. The inbound
+ // frame has a stream-id associated with the inbound connection. This will not match the stream-id of the peer
+ // stream we are forwarding the frames to. If the stream-id on a frame is not set, netty handles sending the
+ // stream-id on the frame to the target stream's stream-id.
+ streamFrame.stream(null);
+
+ peerStream.writeAndFlush(streamFrame).addListener(closeInboundOnPeerFailure);
+ }
+
+ @Override
+ public void channelInactive(final ChannelHandlerContext ctx) {
+ peerStream.close();
+ }
+
+ @Override
+ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
+ logger.warn("Exception proxying frames", cause);
+ ctx.close();
+ }
+}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ManagedEventLoopGroup.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ManagedEventLoopGroup.java
new file mode 100644
index 000000000..5c77b5df3
--- /dev/null
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ManagedEventLoopGroup.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2026 Signal Messenger, LLC
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+package org.whispersystems.textsecuregcm.grpc.net;
+
+import io.dropwizard.lifecycle.Managed;
+import io.netty.channel.EventLoopGroup;
+
+/**
+ * A wrapper for a Netty {@link EventLoopGroup} that implements Dropwizard's {@link Managed} interface, allowing
+ * Dropwizard to manage the lifecycle of the event loop group.
+ */
+public class ManagedEventLoopGroup implements Managed {
+
+ private final T eventLoopGroup;
+
+ public ManagedEventLoopGroup(final T eventLoopGroup) {
+ this.eventLoopGroup = eventLoopGroup;
+ }
+
+ @Override
+ public void stop() throws Exception {
+ this.eventLoopGroup.shutdownGracefully().await();
+ }
+
+ public T getEventLoopGroup() {
+ return eventLoopGroup;
+ }
+}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ManagedNioEventLoopGroup.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ManagedNioEventLoopGroup.java
deleted file mode 100644
index 06d3e97db..000000000
--- a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ManagedNioEventLoopGroup.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package org.whispersystems.textsecuregcm.grpc.net;
-
-import io.dropwizard.lifecycle.Managed;
-import io.netty.channel.nio.NioEventLoopGroup;
-
-/**
- * A wrapper for a Netty {@link NioEventLoopGroup} that implements Dropwizard's {@link Managed} interface, allowing
- * Dropwizard to manage the lifecycle of the event loop group.
- */
-public class ManagedNioEventLoopGroup extends NioEventLoopGroup implements Managed {
-
- @Override
- public void stop() throws Exception {
- this.shutdownGracefully().await();
- }
-}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusConnectionCounterHandler.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusConnectionCounterHandler.java
new file mode 100644
index 000000000..ef4199a93
--- /dev/null
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusConnectionCounterHandler.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2026 Signal Messenger, LLC
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+package org.whispersystems.textsecuregcm.grpc.net;
+
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.Metrics;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
+import java.util.concurrent.atomic.AtomicLong;
+
+@ChannelHandler.Sharable
+public class OmnibusConnectionCounterHandler extends ChannelInboundHandlerAdapter {
+ private final AtomicLong openConnections;
+ private final Counter acceptedConnectionsCounter =
+ Metrics.counter(MetricsUtil.name(OmnibusConnectionCounterHandler.class, "connectionsAccepted"));
+
+ public OmnibusConnectionCounterHandler() {
+ openConnections =
+ Metrics.gauge(MetricsUtil.name(OmnibusConnectionCounterHandler.class, "openConnections"), new AtomicLong());
+ }
+
+ @Override
+ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+ acceptedConnectionsCounter.increment();
+ openConnections.incrementAndGet();
+ super.channelRegistered(ctx);
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ openConnections.decrementAndGet();
+ super.channelInactive(ctx);
+ }
+}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusExceptionHandler.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusExceptionHandler.java
new file mode 100644
index 000000000..325b52c9b
--- /dev/null
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusExceptionHandler.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2026 Signal Messenger, LLC
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+package org.whispersystems.textsecuregcm.grpc.net;
+
+import io.micrometer.core.instrument.Metrics;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
+
+/// A handler that closes the channel on an exception and records errors in a counter. This should be placed at the tail
+/// of pipelines to catch uncaught exceptions gracefully
+@ChannelHandler.Sharable
+public class OmnibusExceptionHandler extends ChannelInboundHandlerAdapter {
+
+ private static final Logger logger = LoggerFactory.getLogger(OmnibusExceptionHandler.class);
+
+ private static final String UNCAUGHT_EXCEPTION_COUNTER_NAME = MetricsUtil.name(OmnibusExceptionHandler.class,
+ "uncaughtException");
+
+ private final String channelName;
+ private final List> expectedExceptions;
+
+ public OmnibusExceptionHandler(final String channelName, final List> expectedExceptions) {
+ this.channelName = channelName;
+ this.expectedExceptions = expectedExceptions;
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ Metrics.counter(UNCAUGHT_EXCEPTION_COUNTER_NAME,
+ "channelName", channelName,
+ "exceptionClass", cause.getClass().getSimpleName())
+ .increment();
+
+ // There are 'expected' ways to get exceptions on a channel (e.g. client disconnects) so we only log them at debug.
+ if (expectedException(cause)) {
+ logger.debug("uncaught exception on channel {}", channelName, cause);
+ } else {
+ logger.warn("unexpected uncaught exception on channel {}", channelName, cause);
+ }
+ ctx.close();
+ }
+
+ private boolean expectedException(final Throwable exception) {
+ return expectedExceptions
+ .stream()
+ .anyMatch(expectedException -> expectedException.isInstance(exception));
+ }
+}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusH2Server.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusH2Server.java
new file mode 100644
index 000000000..7bfa7ef95
--- /dev/null
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusH2Server.java
@@ -0,0 +1,176 @@
+/*
+ * Copyright 2026 Signal Messenger, LLC
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+package org.whispersystems.textsecuregcm.grpc.net;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.dropwizard.lifecycle.Managed;
+import io.micrometer.core.instrument.Metrics;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.ByteBufAllocatorMetricProvider;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.DefaultEventLoopGroup;
+import io.netty.channel.SimpleUserEventChannelHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.DecoderException;
+import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
+import io.netty.handler.codec.http2.Http2MultiplexHandler;
+import io.netty.handler.codec.http2.Http2Settings;
+import io.netty.handler.codec.http2.Http2StreamChannel;
+import io.netty.handler.ssl.ApplicationProtocolNames;
+import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler;
+import io.netty.handler.ssl.SniHandler;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.Mapping;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
+
+/// An HTTP/2 server that proxies H2 streams to configurable backends via path-based routing
+public class OmnibusH2Server implements Managed {
+
+ private static final Logger logger = LoggerFactory.getLogger(OmnibusH2Server.class);
+
+ private static final OmnibusConnectionCounterHandler CONNECT_COUNTER = new OmnibusConnectionCounterHandler();
+ private static final OmnibusExceptionHandler HANDSHAKE_EXCEPTION_HANDLER =
+ new OmnibusExceptionHandler("omnibus-handshake", List.of(SocketException.class, DecoderException.class, IOException.class));
+ private static final OmnibusExceptionHandler SESSION_EXCEPTION_HANDLER =
+ new OmnibusExceptionHandler("omnibus-session", List.of(Http2Exception.class));
+ private static final String IDLE_DISCONNECT_COUNTER_NAME = MetricsUtil.name(OmnibusH2Server.class, "idleDisconnect");
+
+ private final @Nullable Mapping sslContextBySni;
+ private final OmnibusRouter router;
+ private final Duration idleTimeout;
+ private final DefaultEventLoopGroup localEventLoopGroup;
+ private final NioEventLoopGroup nioEventLoopGroup;
+ private final SocketAddress bindAddress;
+
+ private Channel serverChannel;
+
+ /// Create an omnibus server
+ ///
+ /// @param sslContextBySni If not null, a mapping between domain (SNI) and the appropriate SslContext to use for
+ /// that SNI. If null, the server will not include TLS (h2c with prior-knowledge)
+ /// @param nioEventLoopGroup Event loop to use for all NIO channel pipelines
+ /// @param localEventLoopGroup Event loop to use for all local channel pipelines
+ /// @param bindAddress The address the server should listen on
+ /// @param router How the server should select backends based on request paths
+ public OmnibusH2Server(
+ final @Nullable Mapping sslContextBySni,
+ final NioEventLoopGroup nioEventLoopGroup,
+ final DefaultEventLoopGroup localEventLoopGroup,
+ final SocketAddress bindAddress,
+ final OmnibusRouter router,
+ final Duration idleTimeout) {
+ this.sslContextBySni = sslContextBySni;
+ this.nioEventLoopGroup = nioEventLoopGroup;
+ this.localEventLoopGroup = localEventLoopGroup;
+ this.bindAddress = bindAddress;
+ this.router = router;
+ this.idleTimeout = idleTimeout;
+ }
+
+ @Override
+ public void start() throws Exception {
+ if (this.sslContextBySni == null) {
+ logger.warn("No SSL configuration provided for OmnibusH2Server, serving h2c");
+ }
+
+ if (ByteBufAllocator.DEFAULT instanceof ByteBufAllocatorMetricProvider alloc) {
+ Metrics.gauge(MetricsUtil.name(OmnibusH2Server.class, "nettyUsedDirectMemory"),
+ alloc,
+ allocator -> allocator.metric().usedDirectMemory());
+ Metrics.gauge(MetricsUtil.name(OmnibusH2Server.class, "nettyUsedHeapMemory"),
+ alloc,
+ allocator -> allocator.metric().usedHeapMemory());
+ }
+
+ final ServerBootstrap bootstrap = new ServerBootstrap()
+ .group(nioEventLoopGroup)
+ .channel(NioServerSocketChannel.class)
+ .childOption(ChannelOption.SO_KEEPALIVE, true)
+ .childHandler(new ChannelInitializer() {
+ @Override
+ protected void initChannel(final SocketChannel ch) {
+ ch.pipeline().addLast(new IdleStateHandler(0, 0, idleTimeout.toMillis(), TimeUnit.MILLISECONDS));
+ ch.pipeline().addLast(new SimpleUserEventChannelHandler() {
+ @Override
+ protected void eventReceived(final ChannelHandlerContext ctx, final IdleStateEvent evt) {
+ Metrics.counter(IDLE_DISCONNECT_COUNTER_NAME, "type", evt.state().name()).increment();
+ ctx.close();
+ }
+ });
+ ch.pipeline().addLast(CONNECT_COUNTER);
+ ch.pipeline().addLast(new ProxyProtocolHandler());
+ ch.pipeline().addLast(new ProxyMessageAttributeSetterHandler());
+ if (sslContextBySni == null) {
+ configureH2Pipeline(ch.pipeline());
+ } else {
+ ch.pipeline().addLast(new SniHandler(sslContextBySni));
+ ch.pipeline().addLast(new ApplicationProtocolNegotiationHandler(ApplicationProtocolNames.HTTP_2) {
+ @Override
+ protected void configurePipeline(final ChannelHandlerContext ctx, final String protocol) {
+ if (!ApplicationProtocolNames.HTTP_2.equals(protocol)) {
+ // HTTP/2 should be enforced by our ALPN settings
+ logger.error("Unsupported protocol negotiated: {}, closing connection", protocol);
+ ctx.close();
+ return;
+ }
+ configureH2Pipeline(ctx.pipeline());
+ }
+ });
+ ch.pipeline().addLast(HANDSHAKE_EXCEPTION_HANDLER);
+ }
+ }
+ });
+
+ serverChannel = bootstrap.bind(bindAddress).sync().channel();
+ logger.info("Omnibus server listening on {}", getLocalAddress());
+ }
+
+ @VisibleForTesting
+ InetSocketAddress getLocalAddress() {
+ return (InetSocketAddress) serverChannel.localAddress();
+ }
+
+ @Override
+ public void stop() {
+ if (serverChannel != null) {
+ logger.info("Stopping omnibus server");
+ serverChannel.close().syncUninterruptibly();
+ logger.info("Omnibus server stopped");
+ }
+ }
+
+ private void configureH2Pipeline(final ChannelPipeline pipeline) {
+ // Advertise support for RFC-8441 extended connect
+ final Http2Settings settings = Http2Settings.defaultSettings().connectProtocolEnabled(true);
+ pipeline.addLast(Http2FrameCodecBuilder.forServer().initialSettings(settings).build());
+ pipeline.addLast(new Http2MultiplexHandler(new ChannelInitializer() {
+ @Override
+ protected void initChannel(final Http2StreamChannel ch) {
+ ch.pipeline().addLast(new OmnibusH2StreamHandler(nioEventLoopGroup, localEventLoopGroup, router));
+ }
+ }));
+ pipeline.addLast(SESSION_EXCEPTION_HANDLER);
+ }
+}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusH2StreamHandler.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusH2StreamHandler.java
new file mode 100644
index 000000000..81c4cc53e
--- /dev/null
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusH2StreamHandler.java
@@ -0,0 +1,230 @@
+/*
+ * Copyright 2026 Signal Messenger, LLC
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+package org.whispersystems.textsecuregcm.grpc.net;
+
+import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
+
+import io.micrometer.core.instrument.Metrics;
+import io.micrometer.core.instrument.Timer;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.DefaultEventLoopGroup;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.local.LocalAddress;
+import io.netty.channel.local.LocalChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http2.DefaultHttp2Headers;
+import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
+import io.netty.handler.codec.http2.DefaultHttp2ResetFrame;
+import io.netty.handler.codec.http2.Http2Error;
+import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
+import io.netty.handler.codec.http2.Http2HeadersFrame;
+import io.netty.handler.codec.http2.Http2MultiplexHandler;
+import io.netty.handler.codec.http2.Http2Settings;
+import io.netty.handler.codec.http2.Http2StreamChannel;
+import io.netty.handler.codec.http2.Http2StreamChannelBootstrap;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.ReferenceCounted;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.Optional;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/// Handler added on each newly created [Http2StreamChannel] on an H2 connection. Inspects the [Http2HeadersFrame] and
+/// determines which backend to forward the stream to, and then proxies frames to and from the backend.
+///
+/// When this handler receives an H2 header for a new stream-id=X on our parent H2 connection it will
+/// - Receive the stream-id=X header and check the path to determine the correct backend
+/// - Make a new H2 connection to the backend
+/// - Forward the header with stream-id=Y to the backend
+/// - Install a [H2FrameProxyHandler] on the backend stream pipeline that forwards the received stream-id=Y frames from
+/// the backend back to the client on stream-id=X
+/// - Install a [H2FrameProxyHandler] on the client stream pipeline forwards the received stream-id=X frames from the
+/// client to the backend on stream-id=Y
+public class OmnibusH2StreamHandler extends ChannelInboundHandlerAdapter {
+
+ private static final Logger logger = LoggerFactory.getLogger(OmnibusH2StreamHandler.class);
+
+ private static final OmnibusExceptionHandler BACKEND_CONNECTION_EXCEPTION_HANDLER =
+ new OmnibusExceptionHandler("backend-connection", List.of());
+ private static final String BACKEND_STREAM_COUNTER_NAME = name(OmnibusH2StreamHandler.class, "backendStream");
+ private static final String BACKEND_CONNECT_DURATION_NAME = name(OmnibusH2StreamHandler.class,
+ "backendConnectDuration");
+ private static final String BACKEND_TAG = "backend";
+
+ private final OmnibusRouter router;
+
+ private final DefaultEventLoopGroup localEventLoopGroup;
+ private final NioEventLoopGroup nioEventLoopGroup;
+
+ public OmnibusH2StreamHandler(
+ final NioEventLoopGroup nioEventLoopGroup,
+ final DefaultEventLoopGroup localEventLoopGroup,
+ final OmnibusRouter router) {
+ this.router = router;
+ this.localEventLoopGroup = localEventLoopGroup;
+ this.nioEventLoopGroup = nioEventLoopGroup;
+ }
+
+ @Override
+ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
+ if (!(msg instanceof Http2HeadersFrame headersFrame)) {
+ logger.warn("Expected initial HEADERS frame but got {}", msg.getClass().getSimpleName());
+ ReferenceCountUtil.release(msg);
+ ctx.close();
+ return;
+ }
+
+ // We don't expect headers frames to come with manually managed memory attached. Assert this in case this changes
+ // in the future, but for now we don't have to worry about freeing the headers frame
+ assert !(headersFrame instanceof ReferenceCounted);
+
+ // Disable reading from the client because we want to wait until we make the backend connection and install the
+ // forwarding handler before processing any more frames.
+ ctx.channel().config().setAutoRead(false);
+
+ // Select the target backend based on the path
+ final String path = Optional.ofNullable(headersFrame.headers().path()).map(CharSequence::toString).orElse("");
+ final SocketAddress target = router.match(path);
+ final String backendTag = target.toString();
+
+ Metrics.counter(BACKEND_STREAM_COUNTER_NAME, BACKEND_TAG, backendTag).increment();
+
+ // Set X-Forwarded-For from the PROXY protocol header if present, otherwise via the remote address
+ final InetAddress proxyRemoteAddress = ctx.channel().parent()
+ .attr(ProxyMessageAttributeSetterHandler.PROXY_REMOTE_ADDRESS)
+ .get();
+ headersFrame.headers().set("x-forwarded-for", proxyRemoteAddress != null
+ ? proxyRemoteAddress.getHostAddress()
+ : ((InetSocketAddress) ctx.channel().remoteAddress()).getHostString());
+
+ // Make a new H2 connection to the target backend
+ final Timer.Sample connectSample = Timer.start();
+ new Bootstrap()
+ .group(selectEventLoop(ctx, target))
+ .channel(target instanceof LocalAddress ? LocalChannel.class : NioSocketChannel.class)
+ .handler(new ChannelInitializer<>() {
+ @Override
+ protected void initChannel(final Channel ch) {
+ ch.pipeline()
+ .addLast(Http2FrameCodecBuilder.forClient().initialSettings(Http2Settings.defaultSettings()).build());
+ // Http2MultiplexHandler takes handler that is added to new inbound streams. A client Http2MultiplexHandler
+ // like we're defining here should never receive an inbound H2 stream so we can just pass a noop handler
+ ch.pipeline().addLast(new Http2MultiplexHandler(new NoopInboundStreamHandler()));
+ ch.pipeline().addLast(BACKEND_CONNECTION_EXCEPTION_HANDLER);
+ }
+ })
+ .connect(target)
+ .addListener((ChannelFuture connectFuture) -> {
+ connectSample.stop(Timer.builder(BACKEND_CONNECT_DURATION_NAME)
+ .tag(BACKEND_TAG, backendTag)
+ .tag("outcome", connectFuture.isSuccess() ? "success" : "failure")
+ .register(Metrics.globalRegistry));
+
+ if (!connectFuture.isSuccess()) {
+ // Close the client stream with a 502: Bad Gateway if the backend wasn't available
+ logger.warn("Failed to connect to backend {}", target, connectFuture.cause());
+ ctx.channel()
+ .writeAndFlush(new DefaultHttp2HeadersFrame(
+ new DefaultHttp2Headers().status("502"), true))
+ .addListener(ChannelFutureListener.CLOSE);
+ return;
+ }
+
+ // Connected, open a new H2 stream to the backend so we can proxy the client's frames
+ logger.trace("Opening a HTTP/2 stream to the backend {}", target);
+ final Channel backendConnection = connectFuture.channel();
+ createBackendProxyStream(ctx, backendConnection, headersFrame);
+ });
+ }
+
+ /// Create a proxy stream on the provided `backendConnection` that forwards H2 frames to/from the client H2 stream.
+ ///
+ /// @param clientStreamCtx The context for a client H2 stream that targets the backend
+ /// @param backendConnection An established H2 connection [Channel], on which a new h2 stream will be opened
+ /// @param headersFrame The first `headersFrame` from the client h2 stream that should be forwarded to the new
+ /// backend stream
+ private void createBackendProxyStream(
+ final ChannelHandlerContext clientStreamCtx,
+ final Channel backendConnection,
+ final Http2HeadersFrame headersFrame) {
+ new Http2StreamChannelBootstrap(backendConnection)
+ // Forwards response frames from the backend back to the client stream
+ .handler(new H2FrameProxyHandler(clientStreamCtx.channel(), "responseStream"))
+ .open()
+ .addListener((io.netty.util.concurrent.Future streamFuture) -> {
+ if (!streamFuture.isSuccess()) {
+ logger.warn("Failed to open backend stream", streamFuture.cause());
+ clientStreamCtx.channel()
+ .writeAndFlush(new DefaultHttp2ResetFrame(Http2Error.INTERNAL_ERROR))
+ .addListener(ChannelFutureListener.CLOSE);
+ backendConnection.close();
+ return;
+ }
+
+ final Http2StreamChannel backendStream = streamFuture.getNow();
+
+ // Close the entire H2 connection whenever the stream we just opened closes. We only plan on using
+ // a single stream on this connection.
+ backendStream.closeFuture().addListener(_ -> backendConnection.close());
+
+ // We're going to modify the inbound H2 stream channel, which runs on a different eventloop than the
+ // outbound channel we've made to the backend. We have to submit our updates back to the inbound
+ // channel's event loop for thread safety
+ clientStreamCtx.channel().eventLoop().execute(() -> {
+
+ if (!clientStreamCtx.channel().isActive()) {
+ // The client disconnected already and the client pipeline is already torn down.
+ backendConnection.close();
+ return;
+ }
+
+ // Install proxy on client stream, remove this handler, then fire the buffered headers through the proxy
+ clientStreamCtx.pipeline().replace(
+ OmnibusH2StreamHandler.this,
+ "backend-to-client-proxy",
+ new H2FrameProxyHandler(backendStream, "requestStream"));
+ clientStreamCtx.channel().pipeline().fireChannelRead(headersFrame);
+
+ // Resume inbound reads, which should now be forwarded
+ clientStreamCtx.channel().config().setAutoRead(true);
+ });
+ });
+ }
+
+ private EventLoopGroup selectEventLoop(final ChannelHandlerContext inboundCtx, SocketAddress target) {
+ final boolean localInbound = inboundCtx.channel() instanceof LocalChannel;
+ final boolean localTarget = target instanceof LocalAddress;
+
+ // If the inbound eventloop matches the target type, we can just reuse the inbound's event loop
+ if (localInbound == localTarget) {
+ return inboundCtx.channel().eventLoop();
+ }
+
+ return localTarget ? this.localEventLoopGroup : this.nioEventLoopGroup;
+ }
+
+ private static class NoopInboundStreamHandler extends ChannelInboundHandlerAdapter {
+ @Override
+ public void channelRegistered(final ChannelHandlerContext ctx) {
+ logger.error("Inbound stream handler was registered when no inbound streams expected");
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ logger.error("Received unexpected message: {} on inbound stream from backend", msg);
+ super.channelRead(ctx, msg);
+ }
+ }
+}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusRouter.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusRouter.java
new file mode 100644
index 000000000..1365a0c2b
--- /dev/null
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusRouter.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2026 Signal Messenger, LLC
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+package org.whispersystems.textsecuregcm.grpc.net;
+
+import java.net.SocketAddress;
+import java.util.List;
+
+public class OmnibusRouter {
+
+ public record OmnibusRoute(String prefix, SocketAddress backend) {}
+
+ private final List prefixRoutes;
+ private final SocketAddress defaultBackend;
+
+ public OmnibusRouter(final List prefixRoutes, final SocketAddress defaultBackend) {
+ this.prefixRoutes = prefixRoutes;
+ this.defaultBackend = defaultBackend;
+ }
+
+ SocketAddress match(final String path) {
+ for (final OmnibusRoute route : prefixRoutes) {
+ if (path.startsWith(route.prefix)) {
+ return route.backend;
+ }
+ }
+ return defaultBackend;
+ }
+}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ProxyMessageAttributeSetterHandler.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ProxyMessageAttributeSetterHandler.java
new file mode 100644
index 000000000..97d13e692
--- /dev/null
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ProxyMessageAttributeSetterHandler.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2026 Signal Messenger, LLC
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+package org.whispersystems.textsecuregcm.grpc.net;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.haproxy.HAProxyMessage;
+import io.netty.util.AttributeKey;
+import java.net.InetAddress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/// Reads the decoded [HAProxyMessage], stores the source address as a channel attribute, and removes itself.
+class ProxyMessageAttributeSetterHandler extends ChannelInboundHandlerAdapter {
+ private static final Logger logger = LoggerFactory.getLogger(ProxyMessageAttributeSetterHandler.class);
+
+ /// Attribute for the remote address extracted from a proxy protocol header
+ static final AttributeKey PROXY_REMOTE_ADDRESS = AttributeKey.newInstance("proxyRemoteAddress");
+
+ @Override
+ public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
+ if (!(msg instanceof HAProxyMessage proxyMessage)) {
+ ctx.pipeline().remove(this);
+ ctx.fireChannelRead(msg);
+ return;
+ }
+
+ try {
+ final String sourceAddress = proxyMessage.sourceAddress();
+ if (sourceAddress != null) {
+ ctx.channel().attr(PROXY_REMOTE_ADDRESS).set(InetAddress.getByName(sourceAddress));
+ } else {
+ logger.warn("PROXY protocol message has no source address");
+ }
+ } finally {
+ proxyMessage.release();
+ ctx.pipeline().remove(this);
+ }
+ }
+}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ProxyProtocolHandler.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ProxyProtocolHandler.java
new file mode 100644
index 000000000..7b4b3b3c3
--- /dev/null
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ProxyProtocolHandler.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2026 Signal Messenger, LLC
+ * SPDX-License-Identifier: AGPL-3.0-only
+ */
+package org.whispersystems.textsecuregcm.grpc.net;
+
+import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
+
+import io.micrometer.core.instrument.Metrics;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.codec.ProtocolDetectionResult;
+import io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
+import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
+import java.util.List;
+
+class ProxyProtocolHandler extends ByteToMessageDecoder {
+
+ private static final String PROXY_PROTOCOL_DETECTED_NAME =
+ name(ProxyProtocolHandler.class, "proxyProtocol");
+
+ @Override
+ protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List