From 0beeb8a9356dc573b960743125bd6246ee7d0780 Mon Sep 17 00:00:00 2001 From: Ravi Khadiwala Date: Wed, 8 Apr 2026 18:01:54 -0500 Subject: [PATCH] Add h2 omnibus server --- pom.xml | 4 +- service/config/sample.yml | 1 + .../textsecuregcm/WhisperServerService.java | 50 +- .../configuration/GrpcConfiguration.java | 24 +- .../TlsKeyStoreConfiguration.java | 5 +- .../grpc/RequestAttributesInterceptor.java | 2 + .../grpc/net/H2FrameProxyHandler.java | 91 +++ .../grpc/net/ManagedEventLoopGroup.java | 30 + .../grpc/net/ManagedNioEventLoopGroup.java | 16 - .../net/OmnibusConnectionCounterHandler.java | 38 ++ .../grpc/net/OmnibusExceptionHandler.java | 55 ++ .../grpc/net/OmnibusH2Server.java | 176 +++++ .../grpc/net/OmnibusH2StreamHandler.java | 230 +++++++ .../textsecuregcm/grpc/net/OmnibusRouter.java | 30 + .../ProxyMessageAttributeSetterHandler.java | 42 ++ .../grpc/net/ProxyProtocolHandler.java | 48 ++ .../textsecuregcm/grpc/net/SniMapper.java | 164 +++++ .../JettyHttpConfigurationCustomizer.java | 2 - .../redis/FaultTolerantRedisClient.java | 13 +- .../FaultTolerantRedisClusterClient.java | 23 +- .../WhisperServerServiceTest.java | 82 ++- .../grpc/net/AbstractLeakDetectionTest.java | 25 + .../grpc/net/H2FrameProxyHandlerTest.java | 35 + .../grpc/net/OmnibusH2ServerTest.java | 600 ++++++++++++++++++ .../grpc/net/ProxyProtocolHandlerTest.java | 53 ++ .../textsecuregcm/grpc/net/SniMapperTest.java | 179 ++++++ .../FaultTolerantRedisClusterClientTest.java | 7 - service/src/test/resources/config/test.yml | 4 + .../generate-omnibus-h2-server-test-certs.sh | 19 + .../net/generate-sni-mapping-test-certs.sh | 36 ++ .../net/omnibus-h2-server-test-keystore.p12 | Bin 0 -> 2658 bytes .../grpc/net/sni-mapper-test-keystore.p12 | Bin 0 -> 6509 bytes 32 files changed, 2009 insertions(+), 75 deletions(-) create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/H2FrameProxyHandler.java create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ManagedEventLoopGroup.java delete mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ManagedNioEventLoopGroup.java create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusConnectionCounterHandler.java create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusExceptionHandler.java create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusH2Server.java create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusH2StreamHandler.java create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusRouter.java create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ProxyMessageAttributeSetterHandler.java create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ProxyProtocolHandler.java create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/SniMapper.java create mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/grpc/net/AbstractLeakDetectionTest.java create mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/grpc/net/H2FrameProxyHandlerTest.java create mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusH2ServerTest.java create mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/grpc/net/ProxyProtocolHandlerTest.java create mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/grpc/net/SniMapperTest.java create mode 100755 service/src/test/resources/org/whispersystems/textsecuregcm/grpc/net/generate-omnibus-h2-server-test-certs.sh create mode 100755 service/src/test/resources/org/whispersystems/textsecuregcm/grpc/net/generate-sni-mapping-test-certs.sh create mode 100644 service/src/test/resources/org/whispersystems/textsecuregcm/grpc/net/omnibus-h2-server-test-keystore.p12 create mode 100644 service/src/test/resources/org/whispersystems/textsecuregcm/grpc/net/sni-mapper-test-keystore.p12 diff --git a/pom.xml b/pom.xml index 6d4ee2c4c..0b1c6d688 100644 --- a/pom.xml +++ b/pom.xml @@ -70,13 +70,13 @@ 2.3.20 1.5.32 2.0.12 - 6.8.2.RELEASE + 7.5.1.RELEASE 9.0.21 8.1 2.25.4 3.5.0 1.16.4 - 4.1.127.Final + 4.2.13.Final 4.33.2 diff --git a/service/config/sample.yml b/service/config/sample.yml index 10818ef58..9ceaafc61 100644 --- a/service/config/sample.yml +++ b/service/config/sample.yml @@ -517,6 +517,7 @@ idlePrimaryDeviceReminder: grpc: port: 50051 + websocketPort: 8080 asnTable: s3Region: a-region diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 2c6b2654d..30a969b12 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -31,16 +31,23 @@ import io.lettuce.core.metrics.MicrometerOptions; import io.lettuce.core.resource.ClientResources; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics; +import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.local.LocalAddress; +import io.netty.channel.local.LocalServerChannel; +import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.ssl.SslContext; import io.netty.resolver.ResolvedAddressTypes; import io.netty.resolver.dns.DnsNameResolver; import io.netty.resolver.dns.DnsNameResolverBuilder; +import io.netty.util.Mapping; import jakarta.servlet.DispatcherType; import jakarta.servlet.Filter; import jakarta.servlet.ServletRegistration; import java.io.ByteArrayInputStream; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.net.http.HttpClient; import java.nio.charset.StandardCharsets; import java.time.Clock; @@ -164,7 +171,10 @@ import org.whispersystems.textsecuregcm.grpc.ProfileGrpcService; import org.whispersystems.textsecuregcm.grpc.RequestAttributesInterceptor; import org.whispersystems.textsecuregcm.grpc.ValidatingInterceptor; import org.whispersystems.textsecuregcm.grpc.net.ManagedGrpcServer; -import org.whispersystems.textsecuregcm.grpc.net.ManagedNioEventLoopGroup; +import org.whispersystems.textsecuregcm.grpc.net.ManagedEventLoopGroup; +import org.whispersystems.textsecuregcm.grpc.net.OmnibusH2Server; +import org.whispersystems.textsecuregcm.grpc.net.OmnibusRouter; +import org.whispersystems.textsecuregcm.grpc.net.SniMapper; import org.whispersystems.textsecuregcm.jetty.JettyHttpConfigurationCustomizer; import org.whispersystems.textsecuregcm.keytransparency.KeyTransparencyServiceClient; import org.whispersystems.textsecuregcm.limits.CardinalityEstimator; @@ -314,6 +324,7 @@ import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.s3.S3AsyncClient; +import javax.annotation.Nullable; public class WhisperServerService extends Application { @@ -619,8 +630,8 @@ public class WhisperServerService extends Application dnsResolutionEventLoopGroup = new ManagedEventLoopGroup<>(new NioEventLoopGroup()); + final DnsNameResolver cloudflareDnsResolver = new DnsNameResolverBuilder(dnsResolutionEventLoopGroup.getEventLoopGroup().next()) .resolvedAddressTypes(ResolvedAddressTypes.IPV6_PREFERRED) .completeOncePreferredResolved(false) .channelType(NioDatagramChannel.class) @@ -1013,13 +1024,38 @@ public class WhisperServerService extends Application serverBuilder = - NettyServerBuilder.forAddress(new InetSocketAddress(config.getGrpc().bindAddress(), config.getGrpc().port())); + final ManagedEventLoopGroup omnibusLocalEventLoopGroup = new ManagedEventLoopGroup<>(new DefaultEventLoopGroup()); + final ManagedEventLoopGroup omnibusNioEventLoopGroup = new ManagedEventLoopGroup<>(new NioEventLoopGroup()); + final LocalAddress grpcLocalAddress = new LocalAddress("grpc"); + final ServerBuilder serverBuilder = NettyServerBuilder + .forAddress(grpcLocalAddress) + .channelType(LocalServerChannel.class) + .bossEventLoopGroup(omnibusLocalEventLoopGroup.getEventLoopGroup()) + .workerEventLoopGroup(omnibusLocalEventLoopGroup.getEventLoopGroup()); authenticatedServices.forEach(serverBuilder::addService); unauthenticatedServices.forEach(serverBuilder::addService); - final ManagedGrpcServer exposedGrpcServer = new ManagedGrpcServer(serverBuilder.build()); + final ManagedGrpcServer localGrpcServer = new ManagedGrpcServer(serverBuilder.build()); - environment.lifecycle().manage(exposedGrpcServer); + final SocketAddress websocketAddress = + new InetSocketAddress(config.getGrpc().websocketAddress(), config.getGrpc().websocketPort()); + final OmnibusRouter omnibusRouter = new OmnibusRouter(List.of( + new OmnibusRouter.OmnibusRoute("/v1/websocket", websocketAddress), + new OmnibusRouter.OmnibusRoute("/v1/provisioning", websocketAddress)), + grpcLocalAddress); + @Nullable final Mapping sniMapping = config.getGrpc().h2c() + ? null + : SniMapper.buildSniMapping(config.getTlsKeyStoreConfiguration().path(), config.getTlsKeyStoreConfiguration().password().value()); + final OmnibusH2Server omnibusH2Server = new OmnibusH2Server( + sniMapping, + omnibusNioEventLoopGroup.getEventLoopGroup(), + omnibusLocalEventLoopGroup.getEventLoopGroup(), + new InetSocketAddress(config.getGrpc().bindAddress(), config.getGrpc().port()), omnibusRouter, + config.getGrpc().idleTimeout()); + + environment.lifecycle().manage(omnibusLocalEventLoopGroup); + environment.lifecycle().manage(omnibusNioEventLoopGroup); + environment.lifecycle().manage(localGrpcServer); + environment.lifecycle().manage(omnibusH2Server); final List filters = new ArrayList<>(); filters.add(remoteDeprecationFilter); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/GrpcConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/GrpcConfiguration.java index fb3c08742..295879b8f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/GrpcConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/GrpcConfiguration.java @@ -5,11 +5,33 @@ package org.whispersystems.textsecuregcm.configuration; import jakarta.validation.constraints.NotNull; +import java.time.Duration; + +/// Configuration for the gRPC Server +/// +/// @param bindAddress The host to bind the omnibus server to +/// @param port The port to bind the omnibus server to +/// @param websocketAddress The address of a listening websocket server for handling legacy requests +/// @param websocketPort The port of a listening websocket server for handling legacy requests +/// @param idleTimeout The duration after which an idle connection may be disconnected +/// @param h2c If true, listen for plaintext h2c with prior-knowledge +public record GrpcConfiguration( + @NotNull String bindAddress, + @NotNull Integer port, + @NotNull String websocketAddress, + @NotNull Integer websocketPort, + @NotNull Duration idleTimeout, + boolean h2c) { -public record GrpcConfiguration(@NotNull String bindAddress, @NotNull Integer port) { public GrpcConfiguration { if (bindAddress == null || bindAddress.isEmpty()) { bindAddress = "localhost"; } + if (websocketAddress == null || websocketAddress.isEmpty()) { + websocketAddress = "localhost"; + } + if (idleTimeout == null) { + idleTimeout = Duration.ofMinutes(5); + } } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/TlsKeyStoreConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/TlsKeyStoreConfiguration.java index 1bd2c1d68..c38912de8 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/TlsKeyStoreConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/TlsKeyStoreConfiguration.java @@ -7,6 +7,9 @@ package org.whispersystems.textsecuregcm.configuration; import jakarta.validation.constraints.NotNull; import org.whispersystems.textsecuregcm.configuration.secrets.SecretString; +import javax.annotation.Nullable; -public record TlsKeyStoreConfiguration(@NotNull SecretString password) { +public record TlsKeyStoreConfiguration( + @Nullable String path, + @NotNull SecretString password) { } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/RequestAttributesInterceptor.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/RequestAttributesInterceptor.java index eeeab6684..45fd52943 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/RequestAttributesInterceptor.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/RequestAttributesInterceptor.java @@ -48,6 +48,8 @@ public class RequestAttributesInterceptor implements ServerInterceptor { final String acceptLanguageHeader = headers.get(ACCEPT_LANG_KEY); final String xForwardedForHeader = headers.get(X_FORWARDED_FOR_KEY); + // This assumes that X-Forwarded-For has been set by a trusted intermediate proxy. For example, this may be set by + // OmnibusH2Server which itself sets X-Forwarded-For using a PPv2 header that comes from a trusted load-balancer. final Optional remoteAddress = getMostRecentProxy(xForwardedForHeader) .flatMap(mostRecentProxy -> { try { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/H2FrameProxyHandler.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/H2FrameProxyHandler.java new file mode 100644 index 000000000..e4167a4d4 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/H2FrameProxyHandler.java @@ -0,0 +1,91 @@ +/* + * Copyright 2026 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ +package org.whispersystems.textsecuregcm.grpc.net; + +import io.micrometer.core.instrument.Metrics; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.http2.Http2StreamFrame; +import io.netty.util.ReferenceCountUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.metrics.MetricsUtil; + +/// Writes all inbound H2 frames to [this#peerStream], renumbering the inbound H2 stream-id for peer H2 stream +public class H2FrameProxyHandler extends ChannelInboundHandlerAdapter { + + private static final Logger logger = LoggerFactory.getLogger(H2FrameProxyHandler.class); + private static final String WRITABILITY_CHANGED_COUNTER_NAME = MetricsUtil.name(H2FrameProxyHandler.class, "writabilityChanged"); + + private final Channel peerStream; + private final String proxyNameTag; + + // If we fail to write to the peerStream, we want to close the inbound channel. Rather than allocate a new listener + // that captures the inbound ChannelHandlerContext on every message, we capture the ChannelHandlerContext in + // handlerAdded and use it on all forwarded writes. This would not work if we attached this handler to more than + // one channel, but we already have a designated peerStream so this handler is fundamentally single-channel. + private ChannelFutureListener closeInboundOnPeerFailure = null; + + public H2FrameProxyHandler(final Channel peerStream, final String proxyNameTag) { + this.peerStream = peerStream; + this.proxyNameTag = proxyNameTag; + } + + @Override + public void handlerAdded(final ChannelHandlerContext ctx) { + closeInboundOnPeerFailure = f -> { + if (!f.isSuccess()) { + ctx.close(); + } + }; + + // When the peer stream we are forwarding to becomes unwritable/writable, stop/start reading from the source stream. + // This prevents us from reading from the source stream as fast as we can just to buffer requests for the peer + // stream. + peerStream.pipeline().addLast(new ChannelInboundHandlerAdapter() { + @Override + public void channelWritabilityChanged(final ChannelHandlerContext peerCtx) throws Exception { + Metrics.counter(WRITABILITY_CHANGED_COUNTER_NAME, + "isWritable", Boolean.toString(peerCtx.channel().isWritable()), + "proxy", proxyNameTag) + .increment(); + ctx.channel().config().setAutoRead(peerStream.isWritable()); + super.channelWritabilityChanged(peerCtx); + } + }); + } + + @Override + public void channelRead(final ChannelHandlerContext ctx, final Object msg) { + logger.trace("Received frame {}", msg); + if (!(msg instanceof Http2StreamFrame streamFrame)) { + logger.error("Received unexpected frame {}", msg); + ReferenceCountUtil.release(msg); + ctx.close(); + return; + } + + // Clear the stream-id on this frame, so netty will associate it with the peerStream's stream-id. The inbound + // frame has a stream-id associated with the inbound connection. This will not match the stream-id of the peer + // stream we are forwarding the frames to. If the stream-id on a frame is not set, netty handles sending the + // stream-id on the frame to the target stream's stream-id. + streamFrame.stream(null); + + peerStream.writeAndFlush(streamFrame).addListener(closeInboundOnPeerFailure); + } + + @Override + public void channelInactive(final ChannelHandlerContext ctx) { + peerStream.close(); + } + + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) { + logger.warn("Exception proxying frames", cause); + ctx.close(); + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ManagedEventLoopGroup.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ManagedEventLoopGroup.java new file mode 100644 index 000000000..5c77b5df3 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ManagedEventLoopGroup.java @@ -0,0 +1,30 @@ +/* + * Copyright 2026 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ +package org.whispersystems.textsecuregcm.grpc.net; + +import io.dropwizard.lifecycle.Managed; +import io.netty.channel.EventLoopGroup; + +/** + * A wrapper for a Netty {@link EventLoopGroup} that implements Dropwizard's {@link Managed} interface, allowing + * Dropwizard to manage the lifecycle of the event loop group. + */ +public class ManagedEventLoopGroup implements Managed { + + private final T eventLoopGroup; + + public ManagedEventLoopGroup(final T eventLoopGroup) { + this.eventLoopGroup = eventLoopGroup; + } + + @Override + public void stop() throws Exception { + this.eventLoopGroup.shutdownGracefully().await(); + } + + public T getEventLoopGroup() { + return eventLoopGroup; + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ManagedNioEventLoopGroup.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ManagedNioEventLoopGroup.java deleted file mode 100644 index 06d3e97db..000000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ManagedNioEventLoopGroup.java +++ /dev/null @@ -1,16 +0,0 @@ -package org.whispersystems.textsecuregcm.grpc.net; - -import io.dropwizard.lifecycle.Managed; -import io.netty.channel.nio.NioEventLoopGroup; - -/** - * A wrapper for a Netty {@link NioEventLoopGroup} that implements Dropwizard's {@link Managed} interface, allowing - * Dropwizard to manage the lifecycle of the event loop group. - */ -public class ManagedNioEventLoopGroup extends NioEventLoopGroup implements Managed { - - @Override - public void stop() throws Exception { - this.shutdownGracefully().await(); - } -} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusConnectionCounterHandler.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusConnectionCounterHandler.java new file mode 100644 index 000000000..ef4199a93 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusConnectionCounterHandler.java @@ -0,0 +1,38 @@ +/* + * Copyright 2026 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ +package org.whispersystems.textsecuregcm.grpc.net; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Metrics; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import org.whispersystems.textsecuregcm.metrics.MetricsUtil; +import java.util.concurrent.atomic.AtomicLong; + +@ChannelHandler.Sharable +public class OmnibusConnectionCounterHandler extends ChannelInboundHandlerAdapter { + private final AtomicLong openConnections; + private final Counter acceptedConnectionsCounter = + Metrics.counter(MetricsUtil.name(OmnibusConnectionCounterHandler.class, "connectionsAccepted")); + + public OmnibusConnectionCounterHandler() { + openConnections = + Metrics.gauge(MetricsUtil.name(OmnibusConnectionCounterHandler.class, "openConnections"), new AtomicLong()); + } + + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + acceptedConnectionsCounter.increment(); + openConnections.incrementAndGet(); + super.channelRegistered(ctx); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + openConnections.decrementAndGet(); + super.channelInactive(ctx); + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusExceptionHandler.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusExceptionHandler.java new file mode 100644 index 000000000..325b52c9b --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusExceptionHandler.java @@ -0,0 +1,55 @@ +/* + * Copyright 2026 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ +package org.whispersystems.textsecuregcm.grpc.net; + +import io.micrometer.core.instrument.Metrics; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.metrics.MetricsUtil; + +/// A handler that closes the channel on an exception and records errors in a counter. This should be placed at the tail +/// of pipelines to catch uncaught exceptions gracefully +@ChannelHandler.Sharable +public class OmnibusExceptionHandler extends ChannelInboundHandlerAdapter { + + private static final Logger logger = LoggerFactory.getLogger(OmnibusExceptionHandler.class); + + private static final String UNCAUGHT_EXCEPTION_COUNTER_NAME = MetricsUtil.name(OmnibusExceptionHandler.class, + "uncaughtException"); + + private final String channelName; + private final List> expectedExceptions; + + public OmnibusExceptionHandler(final String channelName, final List> expectedExceptions) { + this.channelName = channelName; + this.expectedExceptions = expectedExceptions; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + Metrics.counter(UNCAUGHT_EXCEPTION_COUNTER_NAME, + "channelName", channelName, + "exceptionClass", cause.getClass().getSimpleName()) + .increment(); + + // There are 'expected' ways to get exceptions on a channel (e.g. client disconnects) so we only log them at debug. + if (expectedException(cause)) { + logger.debug("uncaught exception on channel {}", channelName, cause); + } else { + logger.warn("unexpected uncaught exception on channel {}", channelName, cause); + } + ctx.close(); + } + + private boolean expectedException(final Throwable exception) { + return expectedExceptions + .stream() + .anyMatch(expectedException -> expectedException.isInstance(exception)); + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusH2Server.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusH2Server.java new file mode 100644 index 000000000..7bfa7ef95 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusH2Server.java @@ -0,0 +1,176 @@ +/* + * Copyright 2026 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ +package org.whispersystems.textsecuregcm.grpc.net; + +import com.google.common.annotations.VisibleForTesting; +import io.dropwizard.lifecycle.Managed; +import io.micrometer.core.instrument.Metrics; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufAllocatorMetricProvider; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.SimpleUserEventChannelHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.DecoderException; +import io.netty.handler.codec.http2.Http2Exception; +import io.netty.handler.codec.http2.Http2FrameCodecBuilder; +import io.netty.handler.codec.http2.Http2MultiplexHandler; +import io.netty.handler.codec.http2.Http2Settings; +import io.netty.handler.codec.http2.Http2StreamChannel; +import io.netty.handler.ssl.ApplicationProtocolNames; +import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler; +import io.netty.handler.ssl.SniHandler; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.Mapping; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.SocketException; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.metrics.MetricsUtil; + +/// An HTTP/2 server that proxies H2 streams to configurable backends via path-based routing +public class OmnibusH2Server implements Managed { + + private static final Logger logger = LoggerFactory.getLogger(OmnibusH2Server.class); + + private static final OmnibusConnectionCounterHandler CONNECT_COUNTER = new OmnibusConnectionCounterHandler(); + private static final OmnibusExceptionHandler HANDSHAKE_EXCEPTION_HANDLER = + new OmnibusExceptionHandler("omnibus-handshake", List.of(SocketException.class, DecoderException.class, IOException.class)); + private static final OmnibusExceptionHandler SESSION_EXCEPTION_HANDLER = + new OmnibusExceptionHandler("omnibus-session", List.of(Http2Exception.class)); + private static final String IDLE_DISCONNECT_COUNTER_NAME = MetricsUtil.name(OmnibusH2Server.class, "idleDisconnect"); + + private final @Nullable Mapping sslContextBySni; + private final OmnibusRouter router; + private final Duration idleTimeout; + private final DefaultEventLoopGroup localEventLoopGroup; + private final NioEventLoopGroup nioEventLoopGroup; + private final SocketAddress bindAddress; + + private Channel serverChannel; + + /// Create an omnibus server + /// + /// @param sslContextBySni If not null, a mapping between domain (SNI) and the appropriate SslContext to use for + /// that SNI. If null, the server will not include TLS (h2c with prior-knowledge) + /// @param nioEventLoopGroup Event loop to use for all NIO channel pipelines + /// @param localEventLoopGroup Event loop to use for all local channel pipelines + /// @param bindAddress The address the server should listen on + /// @param router How the server should select backends based on request paths + public OmnibusH2Server( + final @Nullable Mapping sslContextBySni, + final NioEventLoopGroup nioEventLoopGroup, + final DefaultEventLoopGroup localEventLoopGroup, + final SocketAddress bindAddress, + final OmnibusRouter router, + final Duration idleTimeout) { + this.sslContextBySni = sslContextBySni; + this.nioEventLoopGroup = nioEventLoopGroup; + this.localEventLoopGroup = localEventLoopGroup; + this.bindAddress = bindAddress; + this.router = router; + this.idleTimeout = idleTimeout; + } + + @Override + public void start() throws Exception { + if (this.sslContextBySni == null) { + logger.warn("No SSL configuration provided for OmnibusH2Server, serving h2c"); + } + + if (ByteBufAllocator.DEFAULT instanceof ByteBufAllocatorMetricProvider alloc) { + Metrics.gauge(MetricsUtil.name(OmnibusH2Server.class, "nettyUsedDirectMemory"), + alloc, + allocator -> allocator.metric().usedDirectMemory()); + Metrics.gauge(MetricsUtil.name(OmnibusH2Server.class, "nettyUsedHeapMemory"), + alloc, + allocator -> allocator.metric().usedHeapMemory()); + } + + final ServerBootstrap bootstrap = new ServerBootstrap() + .group(nioEventLoopGroup) + .channel(NioServerSocketChannel.class) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(final SocketChannel ch) { + ch.pipeline().addLast(new IdleStateHandler(0, 0, idleTimeout.toMillis(), TimeUnit.MILLISECONDS)); + ch.pipeline().addLast(new SimpleUserEventChannelHandler() { + @Override + protected void eventReceived(final ChannelHandlerContext ctx, final IdleStateEvent evt) { + Metrics.counter(IDLE_DISCONNECT_COUNTER_NAME, "type", evt.state().name()).increment(); + ctx.close(); + } + }); + ch.pipeline().addLast(CONNECT_COUNTER); + ch.pipeline().addLast(new ProxyProtocolHandler()); + ch.pipeline().addLast(new ProxyMessageAttributeSetterHandler()); + if (sslContextBySni == null) { + configureH2Pipeline(ch.pipeline()); + } else { + ch.pipeline().addLast(new SniHandler(sslContextBySni)); + ch.pipeline().addLast(new ApplicationProtocolNegotiationHandler(ApplicationProtocolNames.HTTP_2) { + @Override + protected void configurePipeline(final ChannelHandlerContext ctx, final String protocol) { + if (!ApplicationProtocolNames.HTTP_2.equals(protocol)) { + // HTTP/2 should be enforced by our ALPN settings + logger.error("Unsupported protocol negotiated: {}, closing connection", protocol); + ctx.close(); + return; + } + configureH2Pipeline(ctx.pipeline()); + } + }); + ch.pipeline().addLast(HANDSHAKE_EXCEPTION_HANDLER); + } + } + }); + + serverChannel = bootstrap.bind(bindAddress).sync().channel(); + logger.info("Omnibus server listening on {}", getLocalAddress()); + } + + @VisibleForTesting + InetSocketAddress getLocalAddress() { + return (InetSocketAddress) serverChannel.localAddress(); + } + + @Override + public void stop() { + if (serverChannel != null) { + logger.info("Stopping omnibus server"); + serverChannel.close().syncUninterruptibly(); + logger.info("Omnibus server stopped"); + } + } + + private void configureH2Pipeline(final ChannelPipeline pipeline) { + // Advertise support for RFC-8441 extended connect + final Http2Settings settings = Http2Settings.defaultSettings().connectProtocolEnabled(true); + pipeline.addLast(Http2FrameCodecBuilder.forServer().initialSettings(settings).build()); + pipeline.addLast(new Http2MultiplexHandler(new ChannelInitializer() { + @Override + protected void initChannel(final Http2StreamChannel ch) { + ch.pipeline().addLast(new OmnibusH2StreamHandler(nioEventLoopGroup, localEventLoopGroup, router)); + } + })); + pipeline.addLast(SESSION_EXCEPTION_HANDLER); + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusH2StreamHandler.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusH2StreamHandler.java new file mode 100644 index 000000000..81c4cc53e --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusH2StreamHandler.java @@ -0,0 +1,230 @@ +/* + * Copyright 2026 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ +package org.whispersystems.textsecuregcm.grpc.net; + +import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; + +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Timer; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.local.LocalAddress; +import io.netty.channel.local.LocalChannel; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http2.DefaultHttp2Headers; +import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame; +import io.netty.handler.codec.http2.DefaultHttp2ResetFrame; +import io.netty.handler.codec.http2.Http2Error; +import io.netty.handler.codec.http2.Http2FrameCodecBuilder; +import io.netty.handler.codec.http2.Http2HeadersFrame; +import io.netty.handler.codec.http2.Http2MultiplexHandler; +import io.netty.handler.codec.http2.Http2Settings; +import io.netty.handler.codec.http2.Http2StreamChannel; +import io.netty.handler.codec.http2.Http2StreamChannelBootstrap; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.ReferenceCounted; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.List; +import java.util.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/// Handler added on each newly created [Http2StreamChannel] on an H2 connection. Inspects the [Http2HeadersFrame] and +/// determines which backend to forward the stream to, and then proxies frames to and from the backend. +/// +/// When this handler receives an H2 header for a new stream-id=X on our parent H2 connection it will +/// - Receive the stream-id=X header and check the path to determine the correct backend +/// - Make a new H2 connection to the backend +/// - Forward the header with stream-id=Y to the backend +/// - Install a [H2FrameProxyHandler] on the backend stream pipeline that forwards the received stream-id=Y frames from +/// the backend back to the client on stream-id=X +/// - Install a [H2FrameProxyHandler] on the client stream pipeline forwards the received stream-id=X frames from the +/// client to the backend on stream-id=Y +public class OmnibusH2StreamHandler extends ChannelInboundHandlerAdapter { + + private static final Logger logger = LoggerFactory.getLogger(OmnibusH2StreamHandler.class); + + private static final OmnibusExceptionHandler BACKEND_CONNECTION_EXCEPTION_HANDLER = + new OmnibusExceptionHandler("backend-connection", List.of()); + private static final String BACKEND_STREAM_COUNTER_NAME = name(OmnibusH2StreamHandler.class, "backendStream"); + private static final String BACKEND_CONNECT_DURATION_NAME = name(OmnibusH2StreamHandler.class, + "backendConnectDuration"); + private static final String BACKEND_TAG = "backend"; + + private final OmnibusRouter router; + + private final DefaultEventLoopGroup localEventLoopGroup; + private final NioEventLoopGroup nioEventLoopGroup; + + public OmnibusH2StreamHandler( + final NioEventLoopGroup nioEventLoopGroup, + final DefaultEventLoopGroup localEventLoopGroup, + final OmnibusRouter router) { + this.router = router; + this.localEventLoopGroup = localEventLoopGroup; + this.nioEventLoopGroup = nioEventLoopGroup; + } + + @Override + public void channelRead(final ChannelHandlerContext ctx, final Object msg) { + if (!(msg instanceof Http2HeadersFrame headersFrame)) { + logger.warn("Expected initial HEADERS frame but got {}", msg.getClass().getSimpleName()); + ReferenceCountUtil.release(msg); + ctx.close(); + return; + } + + // We don't expect headers frames to come with manually managed memory attached. Assert this in case this changes + // in the future, but for now we don't have to worry about freeing the headers frame + assert !(headersFrame instanceof ReferenceCounted); + + // Disable reading from the client because we want to wait until we make the backend connection and install the + // forwarding handler before processing any more frames. + ctx.channel().config().setAutoRead(false); + + // Select the target backend based on the path + final String path = Optional.ofNullable(headersFrame.headers().path()).map(CharSequence::toString).orElse(""); + final SocketAddress target = router.match(path); + final String backendTag = target.toString(); + + Metrics.counter(BACKEND_STREAM_COUNTER_NAME, BACKEND_TAG, backendTag).increment(); + + // Set X-Forwarded-For from the PROXY protocol header if present, otherwise via the remote address + final InetAddress proxyRemoteAddress = ctx.channel().parent() + .attr(ProxyMessageAttributeSetterHandler.PROXY_REMOTE_ADDRESS) + .get(); + headersFrame.headers().set("x-forwarded-for", proxyRemoteAddress != null + ? proxyRemoteAddress.getHostAddress() + : ((InetSocketAddress) ctx.channel().remoteAddress()).getHostString()); + + // Make a new H2 connection to the target backend + final Timer.Sample connectSample = Timer.start(); + new Bootstrap() + .group(selectEventLoop(ctx, target)) + .channel(target instanceof LocalAddress ? LocalChannel.class : NioSocketChannel.class) + .handler(new ChannelInitializer<>() { + @Override + protected void initChannel(final Channel ch) { + ch.pipeline() + .addLast(Http2FrameCodecBuilder.forClient().initialSettings(Http2Settings.defaultSettings()).build()); + // Http2MultiplexHandler takes handler that is added to new inbound streams. A client Http2MultiplexHandler + // like we're defining here should never receive an inbound H2 stream so we can just pass a noop handler + ch.pipeline().addLast(new Http2MultiplexHandler(new NoopInboundStreamHandler())); + ch.pipeline().addLast(BACKEND_CONNECTION_EXCEPTION_HANDLER); + } + }) + .connect(target) + .addListener((ChannelFuture connectFuture) -> { + connectSample.stop(Timer.builder(BACKEND_CONNECT_DURATION_NAME) + .tag(BACKEND_TAG, backendTag) + .tag("outcome", connectFuture.isSuccess() ? "success" : "failure") + .register(Metrics.globalRegistry)); + + if (!connectFuture.isSuccess()) { + // Close the client stream with a 502: Bad Gateway if the backend wasn't available + logger.warn("Failed to connect to backend {}", target, connectFuture.cause()); + ctx.channel() + .writeAndFlush(new DefaultHttp2HeadersFrame( + new DefaultHttp2Headers().status("502"), true)) + .addListener(ChannelFutureListener.CLOSE); + return; + } + + // Connected, open a new H2 stream to the backend so we can proxy the client's frames + logger.trace("Opening a HTTP/2 stream to the backend {}", target); + final Channel backendConnection = connectFuture.channel(); + createBackendProxyStream(ctx, backendConnection, headersFrame); + }); + } + + /// Create a proxy stream on the provided `backendConnection` that forwards H2 frames to/from the client H2 stream. + /// + /// @param clientStreamCtx The context for a client H2 stream that targets the backend + /// @param backendConnection An established H2 connection [Channel], on which a new h2 stream will be opened + /// @param headersFrame The first `headersFrame` from the client h2 stream that should be forwarded to the new + /// backend stream + private void createBackendProxyStream( + final ChannelHandlerContext clientStreamCtx, + final Channel backendConnection, + final Http2HeadersFrame headersFrame) { + new Http2StreamChannelBootstrap(backendConnection) + // Forwards response frames from the backend back to the client stream + .handler(new H2FrameProxyHandler(clientStreamCtx.channel(), "responseStream")) + .open() + .addListener((io.netty.util.concurrent.Future streamFuture) -> { + if (!streamFuture.isSuccess()) { + logger.warn("Failed to open backend stream", streamFuture.cause()); + clientStreamCtx.channel() + .writeAndFlush(new DefaultHttp2ResetFrame(Http2Error.INTERNAL_ERROR)) + .addListener(ChannelFutureListener.CLOSE); + backendConnection.close(); + return; + } + + final Http2StreamChannel backendStream = streamFuture.getNow(); + + // Close the entire H2 connection whenever the stream we just opened closes. We only plan on using + // a single stream on this connection. + backendStream.closeFuture().addListener(_ -> backendConnection.close()); + + // We're going to modify the inbound H2 stream channel, which runs on a different eventloop than the + // outbound channel we've made to the backend. We have to submit our updates back to the inbound + // channel's event loop for thread safety + clientStreamCtx.channel().eventLoop().execute(() -> { + + if (!clientStreamCtx.channel().isActive()) { + // The client disconnected already and the client pipeline is already torn down. + backendConnection.close(); + return; + } + + // Install proxy on client stream, remove this handler, then fire the buffered headers through the proxy + clientStreamCtx.pipeline().replace( + OmnibusH2StreamHandler.this, + "backend-to-client-proxy", + new H2FrameProxyHandler(backendStream, "requestStream")); + clientStreamCtx.channel().pipeline().fireChannelRead(headersFrame); + + // Resume inbound reads, which should now be forwarded + clientStreamCtx.channel().config().setAutoRead(true); + }); + }); + } + + private EventLoopGroup selectEventLoop(final ChannelHandlerContext inboundCtx, SocketAddress target) { + final boolean localInbound = inboundCtx.channel() instanceof LocalChannel; + final boolean localTarget = target instanceof LocalAddress; + + // If the inbound eventloop matches the target type, we can just reuse the inbound's event loop + if (localInbound == localTarget) { + return inboundCtx.channel().eventLoop(); + } + + return localTarget ? this.localEventLoopGroup : this.nioEventLoopGroup; + } + + private static class NoopInboundStreamHandler extends ChannelInboundHandlerAdapter { + @Override + public void channelRegistered(final ChannelHandlerContext ctx) { + logger.error("Inbound stream handler was registered when no inbound streams expected"); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + logger.error("Received unexpected message: {} on inbound stream from backend", msg); + super.channelRead(ctx, msg); + } + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusRouter.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusRouter.java new file mode 100644 index 000000000..1365a0c2b --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusRouter.java @@ -0,0 +1,30 @@ +/* + * Copyright 2026 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ +package org.whispersystems.textsecuregcm.grpc.net; + +import java.net.SocketAddress; +import java.util.List; + +public class OmnibusRouter { + + public record OmnibusRoute(String prefix, SocketAddress backend) {} + + private final List prefixRoutes; + private final SocketAddress defaultBackend; + + public OmnibusRouter(final List prefixRoutes, final SocketAddress defaultBackend) { + this.prefixRoutes = prefixRoutes; + this.defaultBackend = defaultBackend; + } + + SocketAddress match(final String path) { + for (final OmnibusRoute route : prefixRoutes) { + if (path.startsWith(route.prefix)) { + return route.backend; + } + } + return defaultBackend; + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ProxyMessageAttributeSetterHandler.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ProxyMessageAttributeSetterHandler.java new file mode 100644 index 000000000..97d13e692 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ProxyMessageAttributeSetterHandler.java @@ -0,0 +1,42 @@ +/* + * Copyright 2026 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ +package org.whispersystems.textsecuregcm.grpc.net; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.haproxy.HAProxyMessage; +import io.netty.util.AttributeKey; +import java.net.InetAddress; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/// Reads the decoded [HAProxyMessage], stores the source address as a channel attribute, and removes itself. +class ProxyMessageAttributeSetterHandler extends ChannelInboundHandlerAdapter { + private static final Logger logger = LoggerFactory.getLogger(ProxyMessageAttributeSetterHandler.class); + + /// Attribute for the remote address extracted from a proxy protocol header + static final AttributeKey PROXY_REMOTE_ADDRESS = AttributeKey.newInstance("proxyRemoteAddress"); + + @Override + public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { + if (!(msg instanceof HAProxyMessage proxyMessage)) { + ctx.pipeline().remove(this); + ctx.fireChannelRead(msg); + return; + } + + try { + final String sourceAddress = proxyMessage.sourceAddress(); + if (sourceAddress != null) { + ctx.channel().attr(PROXY_REMOTE_ADDRESS).set(InetAddress.getByName(sourceAddress)); + } else { + logger.warn("PROXY protocol message has no source address"); + } + } finally { + proxyMessage.release(); + ctx.pipeline().remove(this); + } + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ProxyProtocolHandler.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ProxyProtocolHandler.java new file mode 100644 index 000000000..7b4b3b3c3 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/ProxyProtocolHandler.java @@ -0,0 +1,48 @@ +/* + * Copyright 2026 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ +package org.whispersystems.textsecuregcm.grpc.net; + +import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; + +import io.micrometer.core.instrument.Metrics; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.handler.codec.ProtocolDetectionResult; +import io.netty.handler.codec.haproxy.HAProxyMessageDecoder; +import io.netty.handler.codec.haproxy.HAProxyProtocolVersion; +import java.util.List; + +class ProxyProtocolHandler extends ByteToMessageDecoder { + + private static final String PROXY_PROTOCOL_DETECTED_NAME = + name(ProxyProtocolHandler.class, "proxyProtocol"); + + @Override + protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List out) { + + // This does not advance the read index, so the bytes we accumulate via ByteToMessageDecoder are always forwarded + // once we get enough (either to an HAProxyMessageDecoder, or just to the rest of the pipeline) + final ProtocolDetectionResult detected = HAProxyMessageDecoder.detectProtocol(in); + + switch (detected.state()) { + case NEEDS_MORE_DATA: + break; + case DETECTED: + // There is a valid proxy-protocol header. Replace ourselves with the actual decoder (which will forward our + // accumulated bytes to the decoder via handlerRemoved) + Metrics.counter(PROXY_PROTOCOL_DETECTED_NAME, "detected", "true").increment(); + ctx.pipeline().replace(this, "haproxy-decoder", new HAProxyMessageDecoder()); + break; + case INVALID: + // No header, we can just forward any bytes we've accumulated. + Metrics.counter(PROXY_PROTOCOL_DETECTED_NAME, "detected", "false").increment(); + ctx.pipeline().remove(this); + break; + } + } + + +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/SniMapper.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/SniMapper.java new file mode 100644 index 000000000..cb5d9299d --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/net/SniMapper.java @@ -0,0 +1,164 @@ +/* + * Copyright 2026 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ +package org.whispersystems.textsecuregcm.grpc.net; + +import com.google.common.annotations.VisibleForTesting; +import io.netty.handler.ssl.ApplicationProtocolConfig; +import io.netty.handler.ssl.ApplicationProtocolNames; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.util.Mapping; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.security.Key; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.UnrecoverableKeyException; +import java.security.cert.Certificate; +import java.security.cert.CertificateException; +import java.security.cert.CertificateParsingException; +import java.security.cert.X509Certificate; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SniMapper { + + private static final Logger logger = LoggerFactory.getLogger(SniMapper.class); + + private SniMapper() { + } + + /// Build a [Mapping] from a [KeyStore] that maps from domain (via SAN) to [io.netty.handler.ssl.SslContext] that + /// can be used with an [io.netty.handler.ssl.SniHandler]. The provided keystore may contain multiple certificates + /// for a single domain, all matching certificates will be included in the corresponding SslContext. The domain for + /// a certificate is only determined by the SAN and all certificates must have a SAN. The returned [Mapping] returns + /// an arbitrary set of certificates if none of the certificates in the keystore match a requested domain, as + /// permitted by RFC-6066. + /// + /// @param keyStorePath The path to the [KeyStore] + /// @param keyStorePassword The password for the keyStore + /// @return A [Mapping] that maps domains to the corresponding [SslContext] containing the certificates for that + /// domain + public static Mapping buildSniMapping(final String keyStorePath, final String keyStorePassword) + throws IOException { + try (final FileInputStream fis = new FileInputStream(keyStorePath)) { + return buildSniMapping(fis, keyStorePassword); + } + } + + @VisibleForTesting + static Mapping buildSniMapping(final InputStream keyStore, final String keyStorePassword) + throws IOException { + try { + final Map domainKeyStores = partitionByDomain(keyStore, keyStorePassword.toCharArray()); + final Map sslContextsByDomain = new HashMap<>(); + for (final Map.Entry entry : domainKeyStores.entrySet()) { + final KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + kmf.init(entry.getValue(), keyStorePassword.toCharArray()); + sslContextsByDomain.put(entry.getKey(), buildSslContext(kmf)); + } + + // Netty expects the SNI mapping to always return an SslContext. Per RFC-6066 it's valid to continue the handshake + // on an SNI mismatch, it's the client's responsibility to check the returned certificate's SNI. We sort first so + // our choice of certificate is deterministic. + final SslContext defaultSslContext = sslContextsByDomain.entrySet().stream() + .min(Map.Entry.comparingByKey()) + .orElseThrow(() -> new IllegalArgumentException("Key store contained no certificates")) + .getValue(); + + logger.info("Loaded TLS contexts for domains: {}", sslContextsByDomain.keySet()); + return hostname -> sslContextsByDomain.getOrDefault(hostname, defaultSslContext); + } catch (NoSuchAlgorithmException | KeyStoreException | CertificateException | UnrecoverableKeyException e) { + throw new IOException("Failed to load keystore", e); + } + } + + private static SslContext buildSslContext(final KeyManagerFactory kmf) throws SSLException { + return SslContextBuilder.forServer(kmf) + .applicationProtocolConfig(new ApplicationProtocolConfig( + ApplicationProtocolConfig.Protocol.ALPN, + ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE, + ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT, + ApplicationProtocolNames.HTTP_2)) + .protocols("TLSv1.3") + .build(); + } + + private static Map partitionByDomain(final InputStream keystoreStream, final char[] keystorePassword) + throws KeyStoreException, CertificateException, UnrecoverableKeyException, IOException, NoSuchAlgorithmException { + + final KeyStore keyStore = KeyStore.getInstance("PKCS12"); + keyStore.load(keystoreStream, keystorePassword); + + // Group key entries by the domain(s) in each certificate's SANs + final Map domainKeyStores = new HashMap<>(); + for (final String alias : Collections.list(keyStore.aliases())) { + if (!keyStore.isKeyEntry(alias)) { + continue; + } + + final Certificate[] chain = keyStore.getCertificateChain(alias); + if (chain == null || chain.length == 0) { + continue; + } + + final X509Certificate leaf = (X509Certificate) chain[0]; + final Key key = keyStore.getKey(alias, keystorePassword); + + for (final String domain : getDnsNames(leaf)) { + domainKeyStores + .computeIfAbsent(domain, _ -> newEmptyKeyStore()) + .setKeyEntry(alias, key, keystorePassword, chain); + } + } + + if (domainKeyStores.isEmpty()) { + throw new IOException("Keystore contains no usable key entries with DNS names"); + } + return domainKeyStores; + + } + + private static KeyStore newEmptyKeyStore() { + try { + final KeyStore ks = KeyStore.getInstance("PKCS12"); + ks.load(null, null); + return ks; + } catch (KeyStoreException | IOException | NoSuchAlgorithmException | CertificateException e) { + // All Java runtime implementations are required to support PKCS12, and we aren't loading anything from disk + // so an exception here is impossible. + throw new AssertionError("Failed to create empty keystore", e); + } + } + + /// Extract all DNS-type SAN names on this certificate + private static List getDnsNames(final X509Certificate cert) throws CertificateParsingException, IOException { + final Collection> sans = cert.getSubjectAlternativeNames(); + if (sans == null) { + throw new IOException("Certificate did not have SAN extension"); + } + final List dnsSans = sans.stream() + // GeneralName type 2 = dNSName. See getSubjectAlternativeNames + .filter(san -> (int) san.getFirst() == 2) + .map(san -> (String) san.get(1)) + .map(s -> s.toLowerCase(Locale.ROOT)) + .toList(); + if (dnsSans.isEmpty()) { + throw new IOException("Certificate did not have a DNS SAN entry"); + } + return dnsSans; + } + +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/jetty/JettyHttpConfigurationCustomizer.java b/service/src/main/java/org/whispersystems/textsecuregcm/jetty/JettyHttpConfigurationCustomizer.java index 3415c0086..2f44aefc6 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/jetty/JettyHttpConfigurationCustomizer.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/jetty/JettyHttpConfigurationCustomizer.java @@ -40,8 +40,6 @@ public class JettyHttpConfigurationCustomizer implements Container.Listener, Lif httpConfiguration.setNotifyRemoteAsyncErrors(false); } } - - c.addBean(new JettyConnectionMetrics(Metrics.globalRegistry)); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClient.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClient.java index d57ab65f3..302bb8ce4 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClient.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClient.java @@ -3,6 +3,7 @@ package org.whispersystems.textsecuregcm.redis; import com.google.common.annotations.VisibleForTesting; import io.github.resilience4j.circuitbreaker.CircuitBreaker; import io.lettuce.core.ClientOptions; +import io.lettuce.core.MaintNotificationsConfig; import io.lettuce.core.RedisClient; import io.lettuce.core.RedisException; import io.lettuce.core.RedisURI; @@ -57,20 +58,10 @@ public class FaultTolerantRedisClient { this.name = name; - // Lettuce will issue a CLIENT SETINFO command unconditionally if these fields are set (and they are by default), - // which can generate a bunch of spurious warnings in versions of Redis before 7.2.0. - // - // See: - // - // - https://github.com/redis/lettuce/pull/2823 - // - https://github.com/redis/lettuce/issues/2817 - redisUri.setClientName(null); - redisUri.setLibraryName(null); - redisUri.setLibraryVersion(null); - this.redisClient = RedisClient.create(clientResourcesBuilder.build(), redisUri); final ClientOptions.Builder clientOptionsBuilder = ClientOptions.builder() .disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS) + .maintNotificationsConfig(MaintNotificationsConfig.disabled()) // for asynchronous commands .timeoutOptions(TimeoutOptions.builder() .fixedTimeout(commandTimeout) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClient.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClient.java index e0fba6ef7..ae277b6cf 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClient.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClient.java @@ -9,6 +9,7 @@ import io.github.resilience4j.core.IntervalFunction; import io.github.resilience4j.retry.Retry; import io.github.resilience4j.retry.RetryConfig; import io.lettuce.core.ClientOptions; +import io.lettuce.core.MaintNotificationsConfig; import io.lettuce.core.RedisException; import io.lettuce.core.RedisURI; import io.lettuce.core.TimeoutOptions; @@ -69,28 +70,15 @@ public class FaultTolerantRedisClusterClient { this.name = name; - // Lettuce will issue a CLIENT SETINFO command unconditionally if these fields are set (and they are by default), - // which can generate a bunch of spurious warnings in versions of Redis before 7.2.0. - // - // See: - // - // - https://github.com/redis/lettuce/pull/2823 - // - https://github.com/redis/lettuce/issues/2817 - redisUris.forEach(redisUri -> { - redisUri.setClientName(null); - redisUri.setLibraryName(null); - redisUri.setLibraryVersion(null); - }); - final LettuceShardCircuitBreaker lettuceShardCircuitBreaker = new LettuceShardCircuitBreaker(name, circuitBreakerConfigurationName); this.clusterClient = RedisClusterClient.create( - clientResourcesBuilder.nettyCustomizer(lettuceShardCircuitBreaker). - build(), + clientResourcesBuilder.nettyCustomizer(lettuceShardCircuitBreaker) + .build(), redisUris); - final ClusterClientOptions.Builder clusterClientOptionsBuilder = ClusterClientOptions.builder() + final ClusterClientOptions.Builder clusterClientOptionsBuilder = (ClusterClientOptions.Builder) ClusterClientOptions.builder() .disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS) .validateClusterNodeMembership(false) .topologyRefreshOptions(ClusterTopologyRefreshOptions.builder() @@ -100,7 +88,8 @@ public class FaultTolerantRedisClusterClient { .timeoutOptions(TimeoutOptions.builder() .fixedTimeout(commandTimeout) .build()) - .publishOnScheduler(true); + .publishOnScheduler(true) + .maintNotificationsConfig(MaintNotificationsConfig.disabled()); NettyUtil.setSocketTimeoutsIfApplicable(clusterClientOptionsBuilder); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/WhisperServerServiceTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/WhisperServerServiceTest.java index 011923324..48e8819eb 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/WhisperServerServiceTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/WhisperServerServiceTest.java @@ -13,27 +13,28 @@ import io.dropwizard.testing.ConfigOverride; import io.dropwizard.testing.junit5.DropwizardAppExtension; import io.dropwizard.testing.junit5.DropwizardExtensionsSupport; import io.dropwizard.util.Resources; +import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.netty.NettyChannelBuilder; +import io.grpc.stub.MetadataUtils; import jakarta.ws.rs.client.Client; import jakarta.ws.rs.core.Response; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.ServerSocket; import java.net.URI; import java.util.List; import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Semaphore; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.UUID; +import java.util.concurrent.TimeUnit; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.http2.client.HTTP2Client; import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2; import org.eclipse.jetty.io.ClientConnector; import org.eclipse.jetty.util.component.LifeCycle; -import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.StatusCode; import org.eclipse.jetty.websocket.client.WebSocketClient; -import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -41,14 +42,18 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import org.whispersystems.textsecuregcm.configuration.OpenTelemetryConfiguration; +import org.signal.chat.account.AccountsAnonymousGrpc; +import org.signal.chat.account.CheckAccountExistenceRequest; +import org.signal.chat.account.CheckAccountExistenceResponse; +import org.signal.chat.common.IdentityType; +import org.signal.chat.common.ServiceIdentifier; import org.whispersystems.textsecuregcm.metrics.NoopAwsSdkMetricPublisher; import org.whispersystems.textsecuregcm.storage.DynamoDbExtension; import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema; import org.whispersystems.textsecuregcm.tests.util.TestWebsocketListener; import org.whispersystems.textsecuregcm.util.AttributeValues; import org.whispersystems.textsecuregcm.util.HeaderUtils; -import org.whispersystems.textsecuregcm.util.Util; +import org.whispersystems.textsecuregcm.util.UUIDUtil; import org.whispersystems.websocket.messages.WebSocketResponseMessage; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; @@ -65,6 +70,7 @@ class WhisperServerServiceTest { System.setProperty("secrets.bundle.filename", Resources.getResource("config/test-secrets-bundle.yml").getPath()); } + private static final int OMNIBUS_PORT = findAvailablePort(); private static WebSocketClient webSocketClient; private static WebSocketClient h2WebSocketClient; @@ -72,7 +78,8 @@ class WhisperServerServiceTest { private static final DropwizardAppExtension EXTENSION = new DropwizardAppExtension<>( WhisperServerService.class, Resources.getResource("config/test.yml").getPath(), // Tables will be created by the local DynamoDbExtension - ConfigOverride.config("dynamoDbClient.initTables", "false")); + ConfigOverride.config("dynamoDbClient.initTables", "false"), + ConfigOverride.config("grpc.port", String.valueOf(OMNIBUS_PORT))); @RegisterExtension public static final DynamoDbExtension DYNAMO_DB_EXTENSION = new DynamoDbExtension(DynamoDbExtensionSchema.Tables.values()); @@ -200,6 +207,51 @@ class WhisperServerServiceTest { .build()); } + + @Test + void omnibusWebsocket() throws Exception { + final HTTP2Client http2Client = new HTTP2Client(new ClientConnector()); + final WebSocketClient h2WebSocketClient = + new WebSocketClient(new HttpClient(new HttpClientTransportOverHTTP2(http2Client))); + h2WebSocketClient.start(); + + final TestWebsocketListener testWebsocketListener = new TestWebsocketListener(); + final Session session = h2WebSocketClient.connect(testWebsocketListener, + URI.create(String.format("ws://localhost:%d/v1/websocket/", OMNIBUS_PORT))) + .join(); + final WebSocketResponseMessage keepAlive = testWebsocketListener.doGet("/v1/keepalive").join(); + assertEquals(200, keepAlive.getStatus()); + final WebSocketResponseMessage whoami = testWebsocketListener.doGet("/v1/accounts/whoami").join(); + assertEquals(401, whoami.getStatus()); + session.close(); + h2WebSocketClient.stop(); + } + + @Test + void omnibusGrpc() throws Exception { + final ManagedChannel channel = NettyChannelBuilder.forAddress("localhost", OMNIBUS_PORT) + .usePlaintext() + .build(); + + final Metadata metadata = new Metadata(); + metadata.put(Metadata.Key.of("X-Forwarded-For", Metadata.ASCII_STRING_MARSHALLER), "127.0.0.1"); + + final AccountsAnonymousGrpc.AccountsAnonymousBlockingStub stub = AccountsAnonymousGrpc + .newBlockingStub(channel) + .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)); + final CheckAccountExistenceResponse response = stub.checkAccountExistence( + CheckAccountExistenceRequest.newBuilder() + .setServiceIdentifier(ServiceIdentifier.newBuilder() + .setIdentityType(IdentityType.IDENTITY_TYPE_ACI) + .setUuid(UUIDUtil.toByteString(UUID.randomUUID())) + .build()) + .build()); + + assertFalse(response.getAccountExists()); + channel.shutdownNow(); + channel.awaitTermination(1, TimeUnit.SECONDS); + } + private static DynamoDbClient getDynamoDbClient() { final AwsCredentialsProvider awsCredentialsProvider = EXTENSION.getConfiguration().getAwsCredentialsConfiguration() .build(); @@ -208,4 +260,12 @@ class WhisperServerServiceTest { .buildSyncClient(awsCredentialsProvider, new NoopAwsSdkMetricPublisher()); } + private static int findAvailablePort() { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/grpc/net/AbstractLeakDetectionTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/grpc/net/AbstractLeakDetectionTest.java new file mode 100644 index 000000000..e219e4f0f --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/grpc/net/AbstractLeakDetectionTest.java @@ -0,0 +1,25 @@ +/* + * Copyright 2026 Signal Messenger, LLC + * SPDX License Identifier: AGPL 3.0 only + */ +package org.whispersystems.textsecuregcm.grpc.net; + +import io.netty.util.ResourceLeakDetector; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; + +public abstract class AbstractLeakDetectionTest { + + private static ResourceLeakDetector.Level originalResourceLeakDetectorLevel; + + @BeforeAll + static void setLeakDetectionLevel() { + originalResourceLeakDetectorLevel = ResourceLeakDetector.getLevel(); + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); + } + + @AfterAll + static void restoreLeakDetectionLevel() { + ResourceLeakDetector.setLevel(originalResourceLeakDetectorLevel); + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/grpc/net/H2FrameProxyHandlerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/grpc/net/H2FrameProxyHandlerTest.java new file mode 100644 index 000000000..93ba8f82f --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/grpc/net/H2FrameProxyHandlerTest.java @@ -0,0 +1,35 @@ +/* + * Copyright 2026 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ +package org.whispersystems.textsecuregcm.grpc.net; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.netty.channel.WriteBufferWaterMark; +import io.netty.channel.embedded.EmbeddedChannel; +import java.nio.charset.StandardCharsets; +import org.junit.jupiter.api.Test; + +class H2FrameProxyHandlerTest extends AbstractLeakDetectionTest { + @Test + void proxyWritabilityChanged() { + final EmbeddedChannel target = new EmbeddedChannel(); + final EmbeddedChannel source = new EmbeddedChannel(new H2FrameProxyHandler(target, "test")); + + // Set a tiny watermark to guarantee an unflushed write sets to unwritable, and then buffer some data + final byte[] bufferedData = "8 bytes!".getBytes(StandardCharsets.UTF_8); + target.config().setWriteBufferWaterMark(new WriteBufferWaterMark(4, 8)); + target.write(bufferedData); + + assertNull(target.readOutbound(), "nothing should be written without a flush"); + assertFalse(target.isWritable(), "target should be unwritable because we've buffered more than the high watermark"); + assertFalse(source.config().isAutoRead(), "source should not read because the target is unwritable"); + + target.flush(); + assertTrue(target.isWritable(), "after a flush, the target should be writable"); + assertTrue(source.config().isAutoRead(), "after the target becomes writable, autoRead should be enabled"); + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusH2ServerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusH2ServerTest.java new file mode 100644 index 000000000..660ff428c --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/grpc/net/OmnibusH2ServerTest.java @@ -0,0 +1,600 @@ +package org.whispersystems.textsecuregcm.grpc.net; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.grpc.netty.shaded.io.netty.channel.ChannelOption; +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleUserEventChannelHandler; +import io.netty.channel.local.LocalAddress; +import io.netty.channel.local.LocalServerChannel; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioChannelOption; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.haproxy.HAProxyCommand; +import io.netty.handler.codec.haproxy.HAProxyMessage; +import io.netty.handler.codec.haproxy.HAProxyMessageEncoder; +import io.netty.handler.codec.haproxy.HAProxyProtocolVersion; +import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol; +import io.netty.handler.codec.http2.DefaultHttp2DataFrame; +import io.netty.handler.codec.http2.DefaultHttp2GoAwayFrame; +import io.netty.handler.codec.http2.DefaultHttp2Headers; +import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame; +import io.netty.handler.codec.http2.DefaultHttp2ResetFrame; +import io.netty.handler.codec.http2.Http2DataFrame; +import io.netty.handler.codec.http2.Http2Error; +import io.netty.handler.codec.http2.Http2FrameCodecBuilder; +import io.netty.handler.codec.http2.Http2Headers; +import io.netty.handler.codec.http2.Http2HeadersFrame; +import io.netty.handler.codec.http2.Http2MultiplexHandler; +import io.netty.handler.codec.http2.Http2ResetFrame; +import io.netty.handler.codec.http2.Http2Settings; +import io.netty.handler.codec.http2.Http2StreamChannel; +import io.netty.handler.codec.http2.Http2StreamChannelBootstrap; +import io.netty.handler.ssl.ApplicationProtocolConfig; +import io.netty.handler.ssl.ApplicationProtocolNames; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.netty.util.ReferenceCountUtil; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.IntStream; +import javax.annotation.Nullable; +import javax.net.ssl.SSLException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +class OmnibusH2ServerTest extends AbstractLeakDetectionTest { + private static final String KEYSTORE_PASSWORD = "password"; + + // Paths that start with PREFIX should go to the prefix backend, everything else to default. + private static final String PREFIX_BACKEND_IDENTITY = "prefix-backend"; + private static final String PREFIX = "/v1/prefix"; + private static final String DEFAULT_BACKEND_IDENTITY = "default-backend"; + + private final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(); + private final DefaultEventLoopGroup localEventLoopGroup = new DefaultEventLoopGroup(); + + private Channel defaultBackend; + private Channel prefixBackend; + private CompletableFuture backendConnection; + + private OmnibusH2Server server; + + @BeforeEach + void setUp() throws Exception { + // Start two H2C backend servers that echo a response with their identity + defaultBackend = startH2CServer(true, DEFAULT_BACKEND_IDENTITY); + prefixBackend = startH2CServer(false, PREFIX_BACKEND_IDENTITY); + backendConnection = new CompletableFuture<>(); + + // self-signed TLS context for the frontend loaded from test keyStore + final InputStream keyStore = OmnibusH2ServerTest.class.getResourceAsStream("omnibus-h2-server-test-keystore.p12"); + + server = new OmnibusH2Server( + SniMapper.buildSniMapping(keyStore, KEYSTORE_PASSWORD), + nioEventLoopGroup, + localEventLoopGroup, + new InetSocketAddress("127.0.0.1", 0), + new OmnibusRouter( + List.of(new OmnibusRouter.OmnibusRoute(PREFIX, prefixBackend.localAddress())), + defaultBackend.localAddress()), + Duration.ofMinutes(1)); + + server.start(); + } + + @AfterEach + void tearDown() throws Exception { + server.stop(); + defaultBackend.close().sync(); + prefixBackend.close().sync(); + localEventLoopGroup.shutdownGracefully(1, 1000, TimeUnit.MILLISECONDS).sync(); + nioEventLoopGroup.shutdownGracefully(1, 1000, TimeUnit.MILLISECONDS).sync(); + } + + + @Test + void defaultBackend() { + final String response = sendRequestThroughOmnibus("/a/different/path"); + assertEquals(DEFAULT_BACKEND_IDENTITY, response); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void forwardedForHeader(final boolean usePpv2) { + final String expectedSource = usePpv2 ? "127.0.0.123" : "127.0.0.1"; + final HAProxyMessage proxyMessage = usePpv2 + ? new HAProxyMessage(HAProxyProtocolVersion.V2, HAProxyCommand.PROXY, HAProxyProxiedProtocol.TCP4, expectedSource, "127.0.0.2", 1234, 5678) + : null; + final Channel h2Connection = connectToOmnibus(null, proxyMessage); + final String xForwardedFor = sendRequestThroughOmnibus(h2Connection, "/forwarded-for"); + assertEquals(expectedSource, xForwardedFor); + } + + @ParameterizedTest + @ValueSource(strings = {"/v1/prefix", "/v1/prefix/", "/v1/prefix/other"}) + void prefixBackend(final String path) { + final String response = sendRequestThroughOmnibus(path); + assertEquals(PREFIX_BACKEND_IDENTITY, response); + } + + @Test + void multipleStreamsOnSameConnection() { + final Channel h2Connection = connectToOmnibus(); + final int numStreams = 10; + + // Create concurrent streams to both backends on the same connection simultaneously + @SuppressWarnings("rawtypes") + final CompletableFuture[] futures = IntStream.range(0, numStreams) + .mapToObj(i -> CompletableFuture.supplyAsync(() -> + sendRequestThroughOmnibus( + h2Connection, + i % 2 == 0 ? PREFIX : "/v1/other"))) + .toArray(CompletableFuture[]::new); + + // Ensure we get the response from the correct backend for each stream + CompletableFuture.allOf(futures).join(); + for (int i = 0; i < numStreams; i++) { + assertEquals( + i % 2 == 0 ? PREFIX_BACKEND_IDENTITY : DEFAULT_BACKEND_IDENTITY, + futures[i].resultNow()); + } + } + + @Test + void backendDownStreamReset() { + // Kill the default backend so connection attempts from the omnibus fail + defaultBackend.close().syncUninterruptibly(); + + final Channel h2Connection = connectToOmnibus(); + final CompletableFuture headersFuture = new CompletableFuture<>(); + final Http2StreamChannel stream = new Http2StreamChannelBootstrap(h2Connection) + .handler(new HeadersCollectorHandler(headersFuture)) + .open() + .syncUninterruptibly() + .getNow(); + + final Http2Headers headers = new DefaultHttp2Headers() + .method("POST") + .path("/test") + .scheme("https") + .authority("localhost"); + stream.writeAndFlush(new DefaultHttp2HeadersFrame(headers, true)); + final Http2HeadersFrame responseHeaders = headersFuture.join(); + assertEquals("502", responseHeaders.headers().status().toString()); + + // Stream is dead, but connection should stay alive + assertFalse( + h2Connection.closeFuture().awaitUninterruptibly(5, TimeUnit.MILLISECONDS), + "connection should stay open"); + assertTrue(h2Connection.isOpen()); + + h2Connection.close().syncUninterruptibly(); + } + + @ParameterizedTest + @ValueSource(strings = {"/goaway", "/reset"}) + void backendCloseClosesClientStream(final String path) { + final Channel h2Connection = connectToOmnibus(); + final CompletableFuture resetFuture = new CompletableFuture<>(); + final Http2StreamChannel stream = new Http2StreamChannelBootstrap(h2Connection) + .handler(new RstCollectorHandler(resetFuture)) + .open() + .syncUninterruptibly() + .getNow(); + + final Http2Headers headers = new DefaultHttp2Headers() + .method("POST") + // Triggers the server stream handler to either GOAWAY+close or send an RST based on the path + .path(path) + .scheme("https") + .authority("localhost"); + stream.writeAndFlush(new DefaultHttp2HeadersFrame(headers, true)); + assertEquals(Http2Error.CANCEL.code(), resetFuture.join().errorCode()); + + // client<->omnibus h2 connection stays open after a backend close/rst + assertTrue(h2Connection.isActive()); + } + + @Test + void queuedDataFrames() throws Exception { + final Channel h2Connection = connectToOmnibus(); + final CompletableFuture responseFuture = new CompletableFuture<>(); + final Http2StreamChannel stream = new Http2StreamChannelBootstrap(h2Connection) + .handler(new ResponseCollectorHandler(responseFuture)) + .open() + .syncUninterruptibly() + .getNow(); + + final Http2Headers headers = new DefaultHttp2Headers() + .method("POST") + .path("/test") + .scheme("https") + .authority("localhost"); + + final int numFrames = 64; + + // Omnibus should handle queueing up frames while connecting to the backend if we blast all the frames right away + stream.write(new DefaultHttp2HeadersFrame(headers, false)); + final StringBuilder expectedBuilder = new StringBuilder(); + for (int i = 0; i < numFrames; i++) { + final String chunk = String.format("chunk-%03d;", i); + expectedBuilder.append(chunk); + final boolean endStream = (i == numFrames - 1); + stream.write(new DefaultHttp2DataFrame( + Unpooled.copiedBuffer(chunk, StandardCharsets.UTF_8), endStream)); + } + stream.flush(); + final String expected = expectedBuilder.toString(); + + final String response = responseFuture.get(10, TimeUnit.SECONDS); + assertEquals(expected, response); + + h2Connection.close().syncUninterruptibly(); + } + + @Test + void clientDisconnectClosesBackendConnections() throws Exception { + final Channel h2Connection = connectToOmnibus(); + + final Http2StreamChannelBootstrap streamBootstrap = new Http2StreamChannelBootstrap(h2Connection); + final Http2StreamChannel stream = streamBootstrap + .handler(new ResponseCollectorHandler(new CompletableFuture<>())) + .open() + .syncUninterruptibly() + .getNow(); + + // Write an endStream=false header so the stream stays open + final Http2Headers headers = new DefaultHttp2Headers() + .method("POST") + .path("/test") + .scheme("https") + .authority("localhost"); + stream.writeAndFlush(new DefaultHttp2HeadersFrame(headers, false)).syncUninterruptibly(); + final Channel backendServerChannel = backendConnection.join(); + assertFalse( + backendServerChannel.closeFuture().awaitUninterruptibly(10, TimeUnit.MILLISECONDS), + "Channel should be open"); + + // All backend connections the omnibus opened on behalf of this client should close if we disconnect the client + h2Connection.close().syncUninterruptibly(); + assertTrue(backendServerChannel.closeFuture().await(5, TimeUnit.SECONDS)); + } + + + @Test + void backpressure() throws ExecutionException, InterruptedException, TimeoutException { + final Channel h2Connection = connectToOmnibus(); + + // We'll take the client channel becoming unwritable as backpressure signal + final AtomicBoolean isWritable = new AtomicBoolean(true); + final CompletableFuture response = new CompletableFuture<>(); + final Http2StreamChannel stream = new Http2StreamChannelBootstrap(h2Connection) + .handler(new ChannelInboundHandlerAdapter() { + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) { + isWritable.set(ctx.channel().isWritable()); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + try { + if (msg instanceof Http2HeadersFrame headers) { + response.complete(headers); + } + } finally { + ReferenceCountUtil.release(msg); + } + } + }) + .open() + .syncUninterruptibly() + .getNow(); + + final Http2Headers headers = new DefaultHttp2Headers().method("POST").path("/test"); + stream.writeAndFlush(new DefaultHttp2HeadersFrame(headers, false)).syncUninterruptibly(); + + // Make the backend H2 stream processor 'slow' by disabling auto-read on it + final Channel backendServerChannel = backendConnection.join(); + backendServerChannel.config().setAutoRead(false); + + final byte[] chunk = new byte[16384]; + do { + // Write data until our own client hits the high watermark + while (isWritable.get()) { + // Try to wait until the write finishes but if it can't that's fine: we're trying to induce backpressure + stream + .writeAndFlush(new DefaultHttp2DataFrame(Unpooled.wrappedBuffer(chunk), false)) + .awaitUninterruptibly(100, TimeUnit.MILLISECONDS); + Thread.yield(); + } + + // Make sure our channel is still unwritable for a bit since we haven't re-enabled auto-read yet. If we become + // writable it means we are hitting a lower watermark somewhere earlier in the stack, so we can try writing some + // more. Eventually all intermediate channels should flush and we should be stuck on the backend channel which + // will never make progress (because auto-read is disabled) + Thread.sleep(100); + } while (isWritable.get()); + stream.writeAndFlush(new DefaultHttp2DataFrame(Unpooled.wrappedBuffer(chunk), true)); + + // Now re-enable reads on the backend, which should eventually unblock our writes + backendConnection.resultNow().config().setAutoRead(true); + // Now we should eventually be able to send the last (endStream=true) write and get a response + assertEquals("200", response.get(5, TimeUnit.SECONDS).headers().status().toString()); + assertTrue(isWritable.get()); + + h2Connection.close().syncUninterruptibly(); + } + + @Test + void idleTest() throws Exception { + final InputStream keyStore = OmnibusH2ServerTest.class.getResourceAsStream("omnibus-h2-server-test-keystore.p12"); + final Duration timeout = Duration.ofMillis(500); + final OmnibusH2Server timeoutServer = new OmnibusH2Server( + SniMapper.buildSniMapping(keyStore, KEYSTORE_PASSWORD), + nioEventLoopGroup, + localEventLoopGroup, + new InetSocketAddress("127.0.0.1", 0), + new OmnibusRouter( + List.of(new OmnibusRouter.OmnibusRoute(PREFIX, prefixBackend.localAddress())), + defaultBackend.localAddress()), + timeout); + timeoutServer.start(); + + final Channel channel = connectToOmnibus(timeoutServer, null); + + // Send a request to make sure idleTimeouts work even after a stream / backend connection has been established + sendRequestThroughOmnibus(channel, "/a/different/path"); + + // The server should eventually close this idle connection + assertTrue(channel.closeFuture().awaitUninterruptibly(timeout.toMillis() * 5, TimeUnit.MILLISECONDS)); + + timeoutServer.stop(); + } + + private Channel startH2CServer(final boolean local, final String identity) throws InterruptedException { + final EventLoopGroup eventLoopGroup = local ? localEventLoopGroup : nioEventLoopGroup; + return new ServerBootstrap() + .group(eventLoopGroup, eventLoopGroup) + .channel(local ? LocalServerChannel.class : NioServerSocketChannel.class) + // Limit size of kernel TCP buffers to make it easier to hit backpressure in tests + .option(NioChannelOption.SO_RCVBUF, 8192) + .option(NioChannelOption.SO_SNDBUF, 8192) + .childHandler(new ChannelInitializer<>() { + @Override + protected void initChannel(final Channel ch) { + backendConnection.complete(ch); + ch.pipeline().addLast(Http2FrameCodecBuilder.forServer().build()); + ch.pipeline().addLast(new Http2MultiplexHandler(new ChannelInitializer() { + @Override + protected void initChannel(final Http2StreamChannel ch) { + ch.pipeline().addLast(new TestHandler(identity)); + } + })); + } + }) + .bind(local ? new LocalAddress(identity) : new InetSocketAddress("127.0.0.1", 0)) + .sync() + .channel(); + } + + private Channel connectToOmnibus() { + return connectToOmnibus(null, null); + } + + /// Makes an H2 connection to the omnibus at [this#server] on which new H2 streams can be opened + private Channel connectToOmnibus(@Nullable OmnibusH2Server server, @Nullable final HAProxyMessage proxyHeader) { + if (server == null) { + server = this.server; + } + final SslContext clientSsl; + try { + clientSsl = SslContextBuilder.forClient() + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .applicationProtocolConfig(new ApplicationProtocolConfig( + ApplicationProtocolConfig.Protocol.ALPN, + ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE, + ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT, + ApplicationProtocolNames.HTTP_2)) + .build(); + } catch (SSLException e) { + throw new RuntimeException(e); + } + + final Bootstrap clientBootstrap = new Bootstrap() + .group(nioEventLoopGroup) + .channel(NioSocketChannel.class) + // Limit size of kernel TCP buffers to make it easier to hit backpressure in tests + .option(NioChannelOption.SO_RCVBUF, 8192) + .option(NioChannelOption.SO_SNDBUF, 8192) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(final SocketChannel ch) { + ch.pipeline().addLast(HAProxyMessageEncoder.INSTANCE); + } + }); + final Channel ch = clientBootstrap.connect(server.getLocalAddress()) + .syncUninterruptibly() + .channel(); + if (proxyHeader != null) { + ch.writeAndFlush(proxyHeader).syncUninterruptibly(); + } + ch.pipeline().remove(HAProxyMessageEncoder.INSTANCE); + ch.pipeline().addLast(clientSsl.newHandler(ch.alloc(), server.getLocalAddress().getHostName(), server.getLocalAddress().getPort())); + ch.pipeline().addLast(Http2FrameCodecBuilder.forClient() + .initialSettings(Http2Settings.defaultSettings()) + .build()); + ch.pipeline().addLast(new Http2MultiplexHandler(new ChannelInboundHandlerAdapter())); + return ch; + } + + /// Sends an H2 request to [this#server] and returns the H2 response body + private String sendRequestThroughOmnibus(final String path) { + final Channel h2Connection = connectToOmnibus(); + final String result = sendRequestThroughOmnibus(h2Connection, path); + h2Connection.close().syncUninterruptibly(); + return result; + } + + private String sendRequestThroughOmnibus(final Channel h2Connection, final String path) { + final CompletableFuture responseFuture = new CompletableFuture<>(); + final Http2StreamChannelBootstrap streamBootstrap = new Http2StreamChannelBootstrap(h2Connection); + final Http2StreamChannel stream = streamBootstrap + .handler(new ResponseCollectorHandler(responseFuture)) + .open() + .syncUninterruptibly() + .getNow(); + + final Http2Headers headers = new DefaultHttp2Headers() + .method("POST") + .path(path) + .scheme("https") + .authority("localhost"); + + stream.writeAndFlush(new DefaultHttp2HeadersFrame(headers, true)); + + return responseFuture.join(); + } + + /// A backend that either echos the request body, returns an identity, or disconnects based on the request + private static class TestHandler extends ChannelInboundHandlerAdapter { + + // Returned if request has no body + private final String identity; + private final ByteBuf accumulated = Unpooled.buffer(); + + private TestHandler(final String identity) { + this.identity = identity; + } + + @Override + public void channelRead(final ChannelHandlerContext ctx, final Object msg) { + if (msg instanceof Http2HeadersFrame headers) { + final String path = headers.headers().path().toString(); + if (path.contains("reset")) { + ctx.writeAndFlush(new DefaultHttp2ResetFrame(Http2Error.NO_ERROR)) + .addListener(ChannelFutureListener.CLOSE_ON_FAILURE); + } else if (path.contains("goaway")) { + ctx.channel().parent() + .writeAndFlush(new DefaultHttp2GoAwayFrame(Http2Error.NO_ERROR)) + .addListener(ChannelFutureListener.CLOSE); + } else if (path.contains("forwarded-for")) { + final String xForwardedFor = Optional + .ofNullable(headers.headers().get("x-forwarded-for")) + .map(CharSequence::toString) + .orElse(""); + writeResponse(ctx, Unpooled.copiedBuffer(xForwardedFor, StandardCharsets.UTF_8)); + } else if (headers.isEndStream()) { + writeResponse(ctx, Unpooled.copiedBuffer(identity, StandardCharsets.UTF_8)); + } + } else if (msg instanceof Http2DataFrame dataFrame) { + accumulated.writeBytes(dataFrame.content()); + if (dataFrame.isEndStream()) { + writeResponse(ctx, accumulated); + } + } + ReferenceCountUtil.release(msg); + } + + private void writeResponse(final ChannelHandlerContext ctx, final ByteBuf body) { + final Http2Headers responseHeaders = new DefaultHttp2Headers().status("200"); + ctx.write(new DefaultHttp2HeadersFrame(responseHeaders, false)); + ctx.writeAndFlush(new DefaultHttp2DataFrame(body, true)); + } + } + + /// Completes the provided future with the first [Http2DataFrame] received + private static class ResponseCollectorHandler extends ChannelInboundHandlerAdapter { + + private final CompletableFuture responseFuture; + + ResponseCollectorHandler(final CompletableFuture responseFuture) { + this.responseFuture = responseFuture; + } + + @Override + public void channelRead(final ChannelHandlerContext ctx, final Object msg) { + if (msg instanceof Http2DataFrame dataFrame) { + responseFuture.complete(dataFrame.content().toString(StandardCharsets.UTF_8)); + } + ReferenceCountUtil.release(msg); + } + + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) { + responseFuture.completeExceptionally(cause); + } + } + + /// Completes the provided future with the first [Http2HeadersFrame] received + private static class HeadersCollectorHandler extends ChannelInboundHandlerAdapter { + + private final CompletableFuture responseFuture; + + HeadersCollectorHandler(final CompletableFuture responseFuture) { + this.responseFuture = responseFuture; + } + + @Override + public void channelRead(final ChannelHandlerContext ctx, final Object msg) { + if (msg instanceof Http2HeadersFrame headers) { + responseFuture.complete(headers); + } + ReferenceCountUtil.release(msg); + } + + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) { + responseFuture.completeExceptionally(cause); + } + } + + /// Completes the provided future when an RST frame is received or errors if we don't get one + private static class RstCollectorHandler extends SimpleUserEventChannelHandler { + + private final CompletableFuture resetFuture; + + private RstCollectorHandler(final CompletableFuture resetFuture) { + this.resetFuture = resetFuture; + } + + @Override + protected void eventReceived(final ChannelHandlerContext ctx, final Http2ResetFrame evt) { + resetFuture.complete(evt); + } + + @Override + public void channelInactive(final ChannelHandlerContext ctx) { + if (!resetFuture.isDone()) { + resetFuture.completeExceptionally(new IllegalStateException("Channel went inactive without RST")); + } + } + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/grpc/net/ProxyProtocolHandlerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/grpc/net/ProxyProtocolHandlerTest.java new file mode 100644 index 000000000..0e3b2cb9b --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/grpc/net/ProxyProtocolHandlerTest.java @@ -0,0 +1,53 @@ +/* + * Copyright 2026 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ +package org.whispersystems.textsecuregcm.grpc.net; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.codec.haproxy.HAProxyCommand; +import io.netty.handler.codec.haproxy.HAProxyMessage; +import io.netty.handler.codec.haproxy.HAProxyMessageEncoder; +import io.netty.handler.codec.haproxy.HAProxyProtocolVersion; +import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol; +import org.junit.jupiter.api.Test; + +class ProxyProtocolHandlerTest extends AbstractLeakDetectionTest { + private static final HAProxyMessage PROXY_MESSAGE = new HAProxyMessage( + HAProxyProtocolVersion.V2, HAProxyCommand.PROXY, + HAProxyProxiedProtocol.TCP4, "10.0.0.1", "10.0.0.2", 1234, 5678); + + @Test + void sendHeader() { + final EmbeddedChannel encoder = new EmbeddedChannel(HAProxyMessageEncoder.INSTANCE); + encoder.writeOutbound(PROXY_MESSAGE.retain()); + final ByteBuf ppv2Bytes = encoder.readOutbound(); + + final EmbeddedChannel ch = new EmbeddedChannel(new ProxyProtocolHandler()); + ch.writeInbound(ppv2Bytes); + final HAProxyMessage actual = ch.readInbound(); + assertEquals(PROXY_MESSAGE.protocolVersion(), actual.protocolVersion()); + assertEquals(PROXY_MESSAGE.sourceAddress(), actual.sourceAddress()); + } + + @Test + void sendHeaderSlowly() { + final EmbeddedChannel encoder = new EmbeddedChannel(HAProxyMessageEncoder.INSTANCE); + encoder.writeOutbound(PROXY_MESSAGE.retain()); + final ByteBuf ppv2Bytes = encoder.readOutbound(); + + final EmbeddedChannel ch = new EmbeddedChannel(new ProxyProtocolHandler()); + while (ppv2Bytes.isReadable()) { + assertNull(ch.readInbound()); + ch.writeInbound(ppv2Bytes.readBytes(1)); + } + + final HAProxyMessage actual = ch.readInbound(); + assertEquals(PROXY_MESSAGE.protocolVersion(), actual.protocolVersion()); + assertEquals(PROXY_MESSAGE.sourceAddress(), actual.sourceAddress()); + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/grpc/net/SniMapperTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/grpc/net/SniMapperTest.java new file mode 100644 index 000000000..545b4862e --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/grpc/net/SniMapperTest.java @@ -0,0 +1,179 @@ +/* + * Copyright 2026 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ +package org.whispersystems.textsecuregcm.grpc.net; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.SimpleUserEventChannelHandler; +import io.netty.channel.local.LocalAddress; +import io.netty.channel.local.LocalChannel; +import io.netty.channel.local.LocalServerChannel; +import io.netty.handler.ssl.SniHandler; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.SslHandler; +import io.netty.handler.ssl.SslHandshakeCompletionEvent; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.netty.util.Mapping; +import java.io.InputStream; +import java.security.cert.X509Certificate; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import javax.net.ssl.SNIHostName; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLParameters; +import javax.net.ssl.SSLPeerUnverifiedException; +import javax.net.ssl.SSLSession; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +class SniMapperTest { + + // Configuration for precomputed keystore blob defined in sni-mapper-test-keystore.p12 + private static final String FOO_DOMAIN = "foo.example.com"; + private static final String BAR_DOMAIN = "bar.example.com"; + private static final String KEY_STORE_PASSWORD = "password"; + private static final String KEY_STORE_NAME = "sni-mapper-test-keystore.p12"; + + private DefaultEventLoopGroup eventLoopGroup; + private Channel serverChannel; + + @BeforeEach + void setUp() throws Exception { + final InputStream keyStore = SniMapper.class.getResourceAsStream(KEY_STORE_NAME); + eventLoopGroup = new DefaultEventLoopGroup(); + + final Mapping sniMapping = + SniMapper.buildSniMapping(keyStore, KEY_STORE_PASSWORD); + + final LocalAddress localAddress = new LocalAddress(SniMapper.class.getSimpleName()); + serverChannel = new ServerBootstrap() + .group(eventLoopGroup) + .channel(LocalServerChannel.class) + .childHandler(new ChannelInitializer<>() { + @Override + protected void initChannel(final Channel ch) { + ch.pipeline().addLast(new SniHandler(sniMapping)); + } + }) + .bind(localAddress) + .sync() + .channel(); + } + + @AfterEach + void tearDown() throws Exception { + if (serverChannel != null) { + serverChannel.close().sync(); + } + eventLoopGroup.shutdownGracefully(1, 1000, TimeUnit.MILLISECONDS).sync(); + } + + @Test + void unknownDomain() throws Exception { + final InputStream keyStore = SniMapper.class.getResourceAsStream(KEY_STORE_NAME); + final Mapping sniMapping = SniMapper.buildSniMapping(keyStore, KEY_STORE_PASSWORD); + assertNotNull(sniMapping.map("unknown.example.com")); + final X509Certificate defaultCertificate = connectAndGetServerCertificate("unknown.example.com", null); + + // bar.example.com is the lexicographically first domain, so we should default to it. + assertCertificateIsForDomain(defaultCertificate, BAR_DOMAIN); + } + + static List selectCertificate() { + return List.of( + Arguments.of(FOO_DOMAIN, List.of(), "Ed25519"), + Arguments.of(BAR_DOMAIN, List.of(), "Ed25519"), + Arguments.of(BAR_DOMAIN, List.of("ed25519"), "Ed25519"), + Arguments.of(FOO_DOMAIN, List.of("rsa_pss_rsae_sha256", "rsa_pss_rsae_sha384", "rsa_pss_rsae_sha512"), "SHA256withRSA"), + Arguments.of(FOO_DOMAIN, List.of("rsa_pss_rsae_sha256", "rsa_pss_rsae_sha384", "rsa_pss_rsae_sha512", "ed25519"), "SHA256withRSA"), + Arguments.of(FOO_DOMAIN, List.of("ed25519", "rsa_pss_rsae_sha256", "rsa_pss_rsae_sha384", "rsa_pss_rsae_sha512"), "Ed25519") + ); + } + + @ParameterizedTest + @MethodSource + void selectCertificate(final String sni, final List signatureSchemes, final String expectedSigAlgorithm) + throws Exception { + final X509Certificate serverCert = connectAndGetServerCertificate(sni, signatureSchemes.toArray(String[]::new)); + assertNotNull(serverCert); + assertCertificateIsForDomain(serverCert, sni); + assertEquals(expectedSigAlgorithm, serverCert.getSigAlgName()); + } + + private X509Certificate connectAndGetServerCertificate(final String sniHostname, + final String[] signatureSchemes) throws Exception { + final SslContext clientSsl = SslContextBuilder.forClient() + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .protocols("TLSv1.3") + .build(); + + final CompletableFuture certFuture = new CompletableFuture<>(); + + final Bootstrap clientBootstrap = new Bootstrap() + .group(eventLoopGroup) + .channel(LocalChannel.class) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(final LocalChannel ch) { + final SSLEngine engine = clientSsl.newEngine(ch.alloc()); + + final SSLParameters params = engine.getSSLParameters(); + params.setServerNames(List.of(new SNIHostName(sniHostname))); + if (signatureSchemes != null && signatureSchemes.length != 0) { + params.setSignatureSchemes(signatureSchemes); + } + engine.setSSLParameters(params); + + final SslHandler sslHandler = new SslHandler(engine); + ch.pipeline().addLast(sslHandler); + ch.pipeline().addLast(new SimpleUserEventChannelHandler() { + @Override + protected void eventReceived(final ChannelHandlerContext ctx, final SslHandshakeCompletionEvent evt) { + if (!evt.isSuccess()) { + certFuture.completeExceptionally(evt.cause()); + return; + } + try { + final SSLSession session = sslHandler.engine().getSession(); + final X509Certificate cert = (X509Certificate) session.getPeerCertificates()[0]; + certFuture.complete(cert); + } catch (final SSLPeerUnverifiedException e) { + certFuture.completeExceptionally(e); + } + } + }); + } + }); + + final Channel clientChannel = clientBootstrap.connect(serverChannel.localAddress()).sync().channel(); + try { + return certFuture.get(5, TimeUnit.SECONDS); + } finally { + clientChannel.close().sync(); + } + } + + private static void assertCertificateIsForDomain(final X509Certificate cert, final String expectedDomain) + throws Exception { + assertTrue(cert.getSubjectAlternativeNames().stream() + .filter(san -> (int) san.getFirst() == 2) // dNSName + .map(san -> (String) san.get(1)) + .anyMatch(name -> name.equalsIgnoreCase(expectedDomain))); + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClientTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClientTest.java index 5d1d9cbe5..8b97f100f 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClientTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClientTest.java @@ -419,13 +419,6 @@ class FaultTolerantRedisClusterClientTest { return this; } - @Override - @Deprecated - public ClientResources.Builder dnsResolver(final DnsResolver dnsResolver) { - delegate.dnsResolver(dnsResolver); - return this; - } - @Override public ClientResources.Builder eventBus(final EventBus eventBus) { delegate.eventBus(eventBus); diff --git a/service/src/test/resources/config/test.yml b/service/src/test/resources/config/test.yml index 2771b8daa..fadd66ed9 100644 --- a/service/src/test/resources/config/test.yml +++ b/service/src/test/resources/config/test.yml @@ -337,6 +337,8 @@ dynamicConfig: object: | captcha: scoreFloor: 1.0 + grpcAllowList: + enableAll: true remoteConfig: globalConfig: # keys and values that are given to clients on GET /v1/config @@ -521,6 +523,8 @@ idlePrimaryDeviceReminder: grpc: port: 50051 + websocketPort: 8080 + h2c: true asnTable: s3Region: a-region diff --git a/service/src/test/resources/org/whispersystems/textsecuregcm/grpc/net/generate-omnibus-h2-server-test-certs.sh b/service/src/test/resources/org/whispersystems/textsecuregcm/grpc/net/generate-omnibus-h2-server-test-certs.sh new file mode 100755 index 000000000..5422e23bc --- /dev/null +++ b/service/src/test/resources/org/whispersystems/textsecuregcm/grpc/net/generate-omnibus-h2-server-test-certs.sh @@ -0,0 +1,19 @@ +#!/bin/sh +# Generates self-signed local testing certificates for OmnibusH2ServerTest + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +WORK_DIR="$(mktemp -d)" +trap 'rm -rf "$WORK_DIR"' EXIT + +PASSWORD="password" +DAYS=36500 +OMNIBUS_KS="$SCRIPT_DIR/omnibus-h2-server-test-keystore.p12" + +openssl genpkey -algorithm RSA -pkeyopt rsa_keygen_bits:2048 -out "$WORK_DIR/foo-rsa.key" 2>/dev/null; +openssl req -new -x509 -key "$WORK_DIR/foo-rsa.key" -out "$WORK_DIR/foo-rsa.crt" -days "$DAYS" -subj "/CN=foo.example.com" -addext "subjectAltName=DNS:foo.example.com" +openssl pkcs12 -export -in "$WORK_DIR/foo-rsa.crt" -inkey "$WORK_DIR/foo-rsa.key" -out "$WORK_DIR/foo-rsa.p12" -name foo -passout pass:$PASSWORD +keytool -importkeystore -noprompt -srckeystore "$WORK_DIR/foo-rsa.p12" -srcstoretype PKCS12 -srcstorepass $PASSWORD -destkeystore "$OMNIBUS_KS" -deststoretype PKCS12 -deststorepass $PASSWORD + +echo "Wrote keystore to $OMNIBUS_KS" diff --git a/service/src/test/resources/org/whispersystems/textsecuregcm/grpc/net/generate-sni-mapping-test-certs.sh b/service/src/test/resources/org/whispersystems/textsecuregcm/grpc/net/generate-sni-mapping-test-certs.sh new file mode 100755 index 000000000..418e5766b --- /dev/null +++ b/service/src/test/resources/org/whispersystems/textsecuregcm/grpc/net/generate-sni-mapping-test-certs.sh @@ -0,0 +1,36 @@ +#!/bin/sh +# Generates self-signed local testing certificates for SniMapperTest + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +WORK_DIR="$(mktemp -d)" +trap 'rm -rf "$WORK_DIR"' EXIT + +PASSWORD="password" +DAYS=36500 +SNI_KS="$SCRIPT_DIR/sni-mapper-test-keystore.p12" + +openssl genpkey -algorithm RSA -pkeyopt rsa_keygen_bits:2048 -out "$WORK_DIR/foo-rsa.key" 2>/dev/null; +openssl req -new -x509 -key "$WORK_DIR/foo-rsa.key" -out "$WORK_DIR/foo-rsa.crt" -days "$DAYS" -subj "/CN=foo.example.com" -addext "subjectAltName=DNS:foo.example.com" + +openssl genpkey -algorithm Ed25519 -out "$WORK_DIR/foo-ed25519.key" 2>/dev/null; +openssl req -new -x509 -key "$WORK_DIR/foo-ed25519.key" -out "$WORK_DIR/foo-ed25519.crt" -days "$DAYS" -subj "/CN=foo.example.com" -addext "subjectAltName=DNS:foo.example.com" + +openssl genpkey -algorithm RSA -pkeyopt rsa_keygen_bits:2048 -out "$WORK_DIR/bar-rsa.key" 2>/dev/null; +openssl req -new -x509 -key "$WORK_DIR/bar-rsa.key" -out "$WORK_DIR/bar-rsa.crt" -days "$DAYS" -subj "/CN=bar.example.com" -addext "subjectAltName=DNS:bar.example.com" + +openssl genpkey -algorithm Ed25519 -out "$WORK_DIR/bar-ed25519.key" 2>/dev/null; +openssl req -new -x509 -key "$WORK_DIR/bar-ed25519.key" -out "$WORK_DIR/bar-ed25519.crt" -days "$DAYS" -subj "/CN=BAR.EXAMPLE.COM" -addext "subjectAltName=DNS:BAR.EXAMPLE.COM" + +openssl pkcs12 -export -in "$WORK_DIR/foo-rsa.crt" -inkey "$WORK_DIR/foo-rsa.key" -out "$WORK_DIR/foo-rsa.p12" -name foo -passout pass:$PASSWORD +openssl pkcs12 -export -in "$WORK_DIR/foo-ed25519.crt" -inkey "$WORK_DIR/foo-ed25519.key" -out "$WORK_DIR/foo-ed25519.p12" -name foo-ed25519 -passout pass:$PASSWORD +openssl pkcs12 -export -in "$WORK_DIR/bar-rsa.crt" -inkey "$WORK_DIR/bar-rsa.key" -out "$WORK_DIR/bar-rsa.p12" -name bar -passout pass:$PASSWORD +openssl pkcs12 -export -in "$WORK_DIR/bar-ed25519.crt" -inkey "$WORK_DIR/bar-ed25519.key" -out "$WORK_DIR/bar-ed25519.p12" -name bar-ed25519 -passout pass:$PASSWORD + +keytool -importkeystore -noprompt -srckeystore "$WORK_DIR/foo-ed25519.p12" -srcstoretype PKCS12 -srcstorepass $PASSWORD -destkeystore "$SNI_KS" -deststoretype PKCS12 -deststorepass $PASSWORD +keytool -importkeystore -noprompt -srckeystore "$WORK_DIR/foo-rsa.p12" -srcstoretype PKCS12 -srcstorepass $PASSWORD -destkeystore "$SNI_KS" -deststoretype PKCS12 -deststorepass $PASSWORD +keytool -importkeystore -noprompt -srckeystore "$WORK_DIR/bar-ed25519.p12" -srcstoretype PKCS12 -srcstorepass $PASSWORD -destkeystore "$SNI_KS" -deststoretype PKCS12 -deststorepass $PASSWORD +keytool -importkeystore -noprompt -srckeystore "$WORK_DIR/bar-rsa.p12" -srcstoretype PKCS12 -srcstorepass $PASSWORD -destkeystore "$SNI_KS" -deststoretype PKCS12 -deststorepass $PASSWORD + +echo "Wrote 4 certificates for two SNIs to $SNI_KS" diff --git a/service/src/test/resources/org/whispersystems/textsecuregcm/grpc/net/omnibus-h2-server-test-keystore.p12 b/service/src/test/resources/org/whispersystems/textsecuregcm/grpc/net/omnibus-h2-server-test-keystore.p12 new file mode 100644 index 0000000000000000000000000000000000000000..3e6200dd89d301df2206ddf776454fefcf216117 GIT binary patch literal 2658 zcma);XHXLg633Ge0)!SiJRv3m^5_W&_~<;URFMu+q>5DOpr`~vLhrpxMSR z29akXC-vm zxtd#t_8HJ{lLBlT*3mT0K^c2JAh{*b%AIo+nEou6U*P zDdjh;RSJ1PP;#`gITrPztI0R}7Geey0O+V1bOs+TbeEa2W0#uuL2aErHx;<~ivWDa6su?c!In~jQ;N_M0> zAIZ;QlNK1G6-Zt-O!b-A{;7oXy&JKO)M3hG96lIRO-;$9pl4X|hz8Er@7x%T5cDXH z4dLA9Rq=)g&17U>ZGwjuLhCnwMv!oAah*@r6wTRrOT*MXBx|C%qiZ?Hioah1zdwViaX4XbV3X?%-cUFM}&&#K{$>mwFEu3*fTOgCDnBFk$QlI*Z6-!}>V zT;%+EmRm$W*nMs$(e-Fza~82`31<}ONoKj);2zyKNDw*mWMh@G2OCdC!!r4|zx&gS z?y%i-eS=0V+3IRNtGzLNzRsbjJjC`TsMF!3jIXPf>gyYBsWX=(|9pbhuuc7T@(aBz zk7z--zmUv0-|}MVYC}9Q?u%fek4L9G1Vlgg7s6}8ebK3wuk$U=8VWAFUFXUn7F9JU z*`&h0JtsX$aCPaO5&tGoA?%(`^xs#BewR=Hh;~=Gplta`ep&lEV0w_o(4m=bi#2S; z*jbLRY+<^&<{4CvXdoum_Ijwh#2f5WCSDCS()J{|1zp!v-T=E1`&mHZW=*3=O6~%{ z-f4EI?a>k$fm?dODHt!RT&6^=4WCKL(`R+d;_fZ;&R>Xhbm^LDwf^%F2G=7yrOgG# z>TDYx@JG_9!;BIrAXQ~MFBPHjBoTL#R=`5*eKruF6BkIX%g&z-2sdUP!M19r!o zCdf^aTiF)q8rIWsK8S+!gGbB<+llP-XIWc`zJ_5JZBc3JF8kpPavXAu?>5b1mK_N) z?fYm+6w@zUQZS;);Shj5zzyJrLjIPS(2QVueMc8N1X@BuTtY+yEh;J^E{?*06@E35 zL2@x*k!wi^1O!|sp5G4ee*s#rklxQLo9QVt@moKh<5|JPQvBop2hf~!&gfbn$yJGT z`#O{7$evdi5*&*o@Hw`{n9m`QsDiywx!`avcnx{0$I?ntUa1&!%wP(So!I@V8y&OC zagKalK4ZQ%VS@s^@E?o9Lx{Pr_$mX9W9@D44RHs;sF%m;fm`=_#iIuE%*jbE9~sSy z#I|-N#H6h4j-@VBbl;t+vI$ytKW$sQv_sDFz!!UZ_g=+QMV|ah|4H8Bn1<5w`9=l6 z2>%~xJx*(DBgq5Yn7`+baVc|`{#Vw^2-syHr$uh_{eW-g-r9AQ9xa`rb^vHpjKVfs zvC+lPCNkQN-TOtU)Y;C5!g?AFH>8vU7RUS}=4f?rYOm1L61I$PIIN(vI!Mezy^b$< znnVFN+1^b0+?gsoRPoE<*@=uKPv<#Lx^su_q#@QFcU1{z&gGSe(yoet%xSDMD8;&c zGLA0yGcCf!Yu}$t)y+{6U|YoAvDLZ+vY9liWmIB6o9*G29^h$1YwILP#SZQWv%Xx6$%`g?9yl5DNMita%JHCa z(ilXvAfce@Q@@;)SDNUjj!tO>@POoBFh!c(0|&xI&BCJ~$_;Z5 z1<9FX*bB>lOd#dPj=MfLREx{*Uo91%Mku{*b_%2zW!sU(!#^FtZyd|X5Jz6TFWS~<{^90nUOyQ*I2 zvTRrkoZ4l5L>KNBFI^rnU3% zE#?I%4VN|+RO>4(o;Esu?&rrB$2eP->?JB{GV?ZP7eZB!9ro9(Q>g2u^af20{?6iP z6lYrH!PgKV?hl~vHO|veS|~J%`q$qD0+Is2h`bJC>Xl|m&txI1zuSC6ZhqfM?_84_ nos*`^oA*kbl?5}h{z*1P2%4>(slKC`dl|T8m9G>3FJ$}^W>(J` literal 0 HcmV?d00001 diff --git a/service/src/test/resources/org/whispersystems/textsecuregcm/grpc/net/sni-mapper-test-keystore.p12 b/service/src/test/resources/org/whispersystems/textsecuregcm/grpc/net/sni-mapper-test-keystore.p12 new file mode 100644 index 0000000000000000000000000000000000000000..25125da351ad9ea306d066268fd9ae5126188ae6 GIT binary patch literal 6509 zcmbW6Wl$VkmWF9)+@W!|pn+~&I?%y`Td)wE#)AZxK#&Fk!5tDjxCIRacX#)Oh9F4@ zfi%v3v$Hi*TeY<_yLEoubI!eW>;3UQ_tXW4(K%uQu)tw-lsNc&2vr1{7=R1N52FJD z!{{);VYIj4Fk=1xRw7meh7rU5N<+b60DK(6|Jp@F0Km+DTMd3&0fzshADb9#0+#&e zk_gNX1ew_+ab$~3dTsKley_YkI*$nNnqgvclVJi_!9+NCe?Jcs9~%SAgoAI1PzBgw z0RenKvg07Gl>=|@XP2W>l*AH0*MP7evF#_SkJM_+HdresX`7R|sG`epJ;w^KqmFWW z>+&+rjGX;H$y^H8blNVnEL9uppq*7DFgzN($WONjIS7ytH89sdE{6|-&@xkFz%ktZ zipLms7`7O~7-D|~1VakU_Rlsd2na}F;OJ_{3XzZymk@@CL!?B+q`+a=@Bh0PVc37E zgbn>yav;**TMQ|GSZU)vRU6YVios!f3;K8DuwV&~hV}-oU3b@kDl05CfnmVu-DO5R z$``4w%@+bDnB_;N&B7nS*CL3pFl%4*hmU$9@+4d`qzx=c$pohvZ`s zy+o?u3OOnp44exV+GU^cL;HhG2`0w#Bg+f2VlY@-lvidP`bJi{Ae(okYD0%^L zamthRN=Dg_qiAU}T$Z+k-|vF&IZ--+c+M3|krWScrq^t_E)`Y&E$Me)A%kMAi;>GT6n41b5blnknx4zqr%dQ_F zi`(PjH0ssh{F8Jcsbk!?IsA~aL~ zAVV09ZGPmTT17~{YF-=ogNp0Wz-^27Cb&{2_j#u~OUpN^d$X z;YpD)a&8GdoDmq38)(pB4Wi_-^rU(fd{jSk$)rmsN2i*gX)~kmdp@o7kuhW0X0jE_ z&L>T!Y71$3ZOxGvd&GU@Yh;~iGhCNk8r9&fOb3Lmd%f3W_j;h-8$Mn&ErKM}&}K9m z%b82@8vG=AwWVhyo3S{@`_^`Kv~{>?`n91jyCH1MrExaD%leLq+SyU7t4^={`L~rb z>*q+jX1HJZgIuPub5o)tJKz>&WfMt@rRRg&4ap3NjakXwXvb=11SQ>qMQ14hRY^~f zCRcEXO@4p*(47xx`G^uOfBdU>$|^G=LhUhbP2YfK5}=_4-;=WcL+zMQW>-z>+0F(V z48s_6ALrBvZU0N~R5^Z=%hA?U1T~YMZptpf9eZTL5KoUJeB;Fue1Q+Fdc4uEisO0k>e!?n6GQmTCM(nQy_^d{;zp5{O{CROD*2{)nrM3VCWQWF$?FS4Z{ zYyH)|w+!mRx7m10g~h~z_)aHC$o$mg;*e)<)*s5xGU;;r?DBLClcf}IJuu>qfO=@? zPd6QhuroE6S%E13)@LrQox{E zSO-EXQgWfHeXJI78=Qsd_&V%vpzpC&kaKZU4S(Oscyb`9LPuqVYrAtAnH8KP&cGX1Qv zjkq|htRtEC?Q?aI@B7MbLcSnwC(*Ql28kVsXKXYVmCwLAVi+cohg=;Z609M+6!`Sy z?NOs1nRw5?Y~8EJe~WT((5_FjyEGIaw@rdDm_udX;jy3gi zJ4Gshg^$~U(!=l=Y~}d$kMDvvRyCtT_2G!tY>d^c!}A2b&&fXKGu#dPZt!S5*?=iD zp#iv|qQl`x9WPcY4Rd@lovgJ8-j2G)TBl zJ?ahfbxAHp)KteksvDe7MNr__J^Acn zpZm4Q3i6-&C2VA_v43Ka8^a=~v#}_4AEUwCz<0v#x*W|{7`Pd=l$lV57 z)7eFzG9#u+xdV#szx=twtV##@;q_I+pC+Up`Ld0QzPR1>ds--6 zM05TQDk(=cB=&OhEK;RS!=H7KfOm5-{Nb&Dk_STR5yY8K@;Qyx6T5-n3h%U}UP{u~ z0Ju7BaMdbqq46Jq3@X8Z1qzt(&WLwqgS+gO8Ep1*!kFq%w^p>AOxvFCJ`r1J-&g8K zNL?sqNI6sG&34rQMpAD3J&z%k)bQ`$fy5DBkv+6T?XoXqiQgqep{SctCRMq``iS@C zphDSVE#!fNl;)J~REPdXVH{ADi@7e*lieK=EIAuf(q}M=t1;+rnDcAkCf6w8@H~NX zRg5y9eXmAppL(E2#HXH4C@QHzZA9j8Ry|7^H+_#(uwh2qdwk9bh{mv zxnqJ2W|Iy=ahhGC?pP~|Tis5y^x7_ z@LaTdspFu6i2K#zIu4g96Q}%|Uj$R)ILkVh8PzXBWbmpZ?V`1YhdLH0={3EzQ(k2?c9d7)nB!Eb& zFbLL2bnc3qo;-rF_a1?0Xq38YlOlZi`R8X;`de2F(-$q!iWf?rrk2*7Zo zGRdilyE3j%=iN z$nbI`Jk8B=;9GLQfMzc~g>HFm(3(E~b{=Xxp)(a?v(RXgj|d(pRx-F0nEWy>}YHGvnvXn9^$xLk{E^E7tp=zEu zYW+y;xr%d)?rS&ET(KI5Q_Nk$Mu`ZE_qBsud;8+AJElUTQ7QB2Fr98J--$}H1pro~ z&h+h8MR4`4i?jgQyiur|a&)8TtH$SwNL|sC{&lyA#a44qQMOeDs&9XKYm?_(N(<-o zuHtB_Y5q_$-;G47OL)8~h`6E1H-w=(nvLpJ`!CvC)V?9Zi`9Y%+F8RrT|j_oFUO#?E>+Fs<_2OKoow_Zn_34LkAa)OcIZGFT<|BUSCJZ7{R4MFit$ zHhMMCG8FHhHT)imSS-BRQ@tIK2-V8K7JKyrPqLom$cV>dM=Rw-Q2{3OJh1Rrmh2c~ z#W6?sQ8qL1=gN^F?o2H2ai$MMLN=IiQjY^1v!XAq^#!#i?B!LLvym+0z11At_IM4w+I%=)2Ueo?%S=f*FSg27N_X0FCnY!5#@dk@>>?ldwDqPiDE z6BTl^gFZNw1;A__%$Yctppsa;;T)!h0lXmAM)d5*;MweL@`F_V85*ke3ChE$PYmv> zBm1XifEiywHvDY~jw@=!swOk7s=|4GvzzwvrViJ#xuC@h!{uM*ny10S}pGFSn@d*ou!ci+C@W z_rFbbo*uuT+jL{H=$;9!?Z2YLHboT|W(M(Y@0ER&`j!;G9;V^ewzdET00=r00y!KTKD7jzdAo#>*j+EhDa_ z-dJGkzShyJU2!M|c_nI1|LzJAe5G+5xEXX|ojMsv>f3|;5+Sv+m~CG`?= zJl%)yyBhwzs3+ZaZOWp4T^HH-k8ZrWsAx?zaB7NENJnF^V*vM$OjFf(ciE*w{)%L$ zlZeUX58^js+VtUj*r0t4znNRPp6KCUaT)z)*8@s%Pp&a=*Q0OWhN_=YRt-pytZ7JU zg)UH7quGv+vEsrwV^O@wde4@DcHD9vz{c(YHbHIFa2n}skX(2^0kp+Q%`JX-ZsPQy z(c3oCt8gwMK!or+;9wM9h-Jxw+za8}>d83lgtUlfXYJUMoVV8sdbm7Wq^2aLECOS#A+{V08%O4I z`->l)D()K}0p3J1&@K23XrgJS-}&mA?PDx8*CtSe6n=t&3GIp)PxzXdR%=4gvE21>*Yg^^l=z;o&Zyba^;rID+9=?XOp(>n0 zM~sZ@PDpZTee26*F)1CSe2^c-%(Mw*^wGn*+Z$O{?%N2c(Hn!;M}V3IDN5_TS3i9; z+^ZtNiI}CNacIJ%u0TP$M=w8LQr{GD60s2Ke~1$pINNc6UT@ej{OB^$Uq#nsEaWv&s{&dh()H@%j&+uL=HHq&Nn241}_)tOK99dlga#g&Z}ErSq*f z^l`hpABzwS_Nt=J`m^vEfLbg=C1cpVs_&o$y*^>pI_en+A{(z*_{eNH3x6j3?_L4l_nuiHZ)SAp+W+N;A&BWJa@;>rX4{5IKrf zL%2z&?)tn3FQbW`$^+w+EQ!}B+@;SVo@7>6f7|_H)dZWhNsj$~vbOXTBSjYd@~m0& zch<%_w|}Dmw!QYO=2X8~)yHK0C}rCxllX>9SAr)tn$w0C<%=$7KC~TDEZWli3FV`C zr@yxdR*EWZvt;VwH;D5uT`6MAwB}&`eJ-mBHH8ZqyVv@(C`7dXw07GotoXQi>qtKw z{;|P*g6}@B$MTm}vf;ibWfcXq2fynXwMU#iR$ES%#icrFNMP%?zUN^Y?<)Gfc0M`p zGxs4MG1)U!Q_lGcC-^8C((ia;;$$0$`OBCrTh>;Z{Pj?wfu`WgZ|gUpjQb)S@ z((r3u-GZ&qzIS==_BBmp6?2ttx%2`qrs`RI6Aw6zCCqHAl@;{y&W||h9dayR5N?bL z_%wgkGV#(uby;b2x37C`#t}$Zg1}a|FIyTXjo!6f!qm>3YL_3w(B5D9rDu?H zdv@3{U^scBY*oZc{8?&ueO*`#S0f<(%cE%-2^djHB@4g1{zt&mskA^GLI;_hFQ-IO?v!WKDR;^grEnku!gedk zj@kS^gvyRFcY>aYnnSL)CZdm49KLd`C7;;I(uvfT-h19T$bMZMA@|z}i={GYiFx+u0XOXP zz{=sqXf{|A3;`4W;}ZfCfQ