Introduce a (dormant) Noise/WebSocket for future client/server communication

This commit is contained in:
Jon Chambers
2024-02-23 11:42:42 -05:00
committed by GitHub
parent d2716fe5cf
commit a5774bf6ff
45 changed files with 3262 additions and 84 deletions

View File

@@ -298,11 +298,6 @@ public class WhisperServerConfiguration extends Configuration {
@JsonProperty
private TusConfiguration tus;
@Valid
@NotNull
@JsonProperty
private int grpcPort;
@Valid
@NotNull
@JsonProperty
@@ -539,10 +534,6 @@ public class WhisperServerConfiguration extends Configuration {
return tus;
}
public int getGrpcPort() {
return grpcPort;
}
public ClientReleaseConfiguration getClientReleaseConfiguration() {
return clientRelease;
}

View File

@@ -21,13 +21,13 @@ import io.dropwizard.core.setup.Bootstrap;
import io.dropwizard.core.setup.Environment;
import io.dropwizard.jetty.HttpsConnectorFactory;
import io.grpc.ServerBuilder;
import io.grpc.ServerInterceptors;
import io.lettuce.core.metrics.MicrometerCommandLatencyRecorder;
import io.lettuce.core.metrics.MicrometerOptions;
import io.lettuce.core.resource.ClientResources;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.binder.grpc.MetricCollectingServerInterceptor;
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
import io.netty.channel.local.LocalAddress;
import java.net.http.HttpClient;
import java.time.Clock;
import java.time.Duration;
@@ -134,13 +134,14 @@ import org.whispersystems.textsecuregcm.grpc.AccountsGrpcService;
import org.whispersystems.textsecuregcm.grpc.ErrorMappingInterceptor;
import org.whispersystems.textsecuregcm.grpc.ExternalServiceCredentialsAnonymousGrpcService;
import org.whispersystems.textsecuregcm.grpc.ExternalServiceCredentialsGrpcService;
import org.whispersystems.textsecuregcm.grpc.GrpcServerManagedWrapper;
import org.whispersystems.textsecuregcm.grpc.KeysAnonymousGrpcService;
import org.whispersystems.textsecuregcm.grpc.KeysGrpcService;
import org.whispersystems.textsecuregcm.grpc.PaymentsGrpcService;
import org.whispersystems.textsecuregcm.grpc.ProfileAnonymousGrpcService;
import org.whispersystems.textsecuregcm.grpc.ProfileGrpcService;
import org.whispersystems.textsecuregcm.grpc.UserAgentInterceptor;
import org.whispersystems.textsecuregcm.grpc.net.ManagedDefaultEventLoopGroup;
import org.whispersystems.textsecuregcm.grpc.net.ManagedLocalGrpcServer;
import org.whispersystems.textsecuregcm.limits.CardinalityEstimator;
import org.whispersystems.textsecuregcm.limits.PushChallengeManager;
import org.whispersystems.textsecuregcm.limits.RateLimitChallengeManager;
@@ -753,20 +754,67 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
final BasicCredentialAuthenticationInterceptor basicCredentialAuthenticationInterceptor =
new BasicCredentialAuthenticationInterceptor(new AccountAuthenticator(accountsManager));
final ServerBuilder<?> grpcServer = ServerBuilder.forPort(config.getGrpcPort())
.addService(ServerInterceptors.intercept(new AccountsGrpcService(accountsManager, rateLimiters, usernameHashZkProofVerifier, registrationRecoveryPasswordsManager), basicCredentialAuthenticationInterceptor))
.addService(new AccountsAnonymousGrpcService(accountsManager, rateLimiters))
.addService(ExternalServiceCredentialsGrpcService.createForAllExternalServices(config, rateLimiters))
.addService(ExternalServiceCredentialsAnonymousGrpcService.create(accountsManager, config))
.addService(ServerInterceptors.intercept(new KeysGrpcService(accountsManager, keysManager, rateLimiters), basicCredentialAuthenticationInterceptor))
.addService(new KeysAnonymousGrpcService(accountsManager, keysManager))
.addService(new PaymentsGrpcService(currencyManager))
.addService(ServerInterceptors.intercept(new ProfileGrpcService(clock, accountsManager, profilesManager, dynamicConfigurationManager,
config.getBadges(), asyncCdnS3Client, profileCdnPolicyGenerator, profileCdnPolicySigner, profileBadgeConverter, rateLimiters, zkProfileOperations, config.getCdnConfiguration().bucket()), basicCredentialAuthenticationInterceptor))
.addService(new ProfileAnonymousGrpcService(accountsManager, profilesManager, profileBadgeConverter, zkProfileOperations));
final ManagedDefaultEventLoopGroup localEventLoopGroup = new ManagedDefaultEventLoopGroup();
final RemoteDeprecationFilter remoteDeprecationFilter = new RemoteDeprecationFilter(dynamicConfigurationManager);
final MetricCollectingServerInterceptor metricCollectingServerInterceptor =
new MetricCollectingServerInterceptor(Metrics.globalRegistry);
final ErrorMappingInterceptor errorMappingInterceptor = new ErrorMappingInterceptor();
final AcceptLanguageInterceptor acceptLanguageInterceptor = new AcceptLanguageInterceptor();
final UserAgentInterceptor userAgentInterceptor = new UserAgentInterceptor();
final LocalAddress anonymousGrpcServerAddress = new LocalAddress("grpc-anonymous");
final LocalAddress authenticatedGrpcServerAddress = new LocalAddress("grpc-authenticated");
final ManagedLocalGrpcServer anonymousGrpcServer = new ManagedLocalGrpcServer(anonymousGrpcServerAddress, localEventLoopGroup) {
@Override
protected void configureServer(final ServerBuilder<?> serverBuilder) {
// Note: interceptors run in the reverse order they are added; the remote deprecation filter
// depends on the user-agent context so it has to come first here!
// http://grpc.github.io/grpc-java/javadoc/io/grpc/ServerBuilder.html#intercept-io.grpc.ServerInterceptor-
serverBuilder
// TODO: specialize metrics with user-agent platform
.intercept(metricCollectingServerInterceptor)
.intercept(errorMappingInterceptor)
.intercept(acceptLanguageInterceptor)
.intercept(remoteDeprecationFilter)
.intercept(userAgentInterceptor)
.addService(new AccountsAnonymousGrpcService(accountsManager, rateLimiters))
.addService(new KeysAnonymousGrpcService(accountsManager, keysManager))
.addService(new PaymentsGrpcService(currencyManager))
.addService(ExternalServiceCredentialsAnonymousGrpcService.create(accountsManager, config))
.addService(new ProfileAnonymousGrpcService(accountsManager, profilesManager, profileBadgeConverter, zkProfileOperations));
}
};
final ManagedLocalGrpcServer authenticatedGrpcServer = new ManagedLocalGrpcServer(authenticatedGrpcServerAddress, localEventLoopGroup) {
@Override
protected void configureServer(final ServerBuilder<?> serverBuilder) {
// Note: interceptors run in the reverse order they are added; the remote deprecation filter
// depends on the user-agent context so it has to come first here!
// http://grpc.github.io/grpc-java/javadoc/io/grpc/ServerBuilder.html#intercept-io.grpc.ServerInterceptor-
serverBuilder
// TODO: specialize metrics with user-agent platform
.intercept(metricCollectingServerInterceptor)
.intercept(errorMappingInterceptor)
.intercept(acceptLanguageInterceptor)
.intercept(remoteDeprecationFilter)
.intercept(userAgentInterceptor)
.intercept(new BasicCredentialAuthenticationInterceptor(new AccountAuthenticator(accountsManager)))
.addService(new AccountsGrpcService(accountsManager, rateLimiters, usernameHashZkProofVerifier, registrationRecoveryPasswordsManager))
.addService(ExternalServiceCredentialsGrpcService.createForAllExternalServices(config, rateLimiters))
.addService(new KeysGrpcService(accountsManager, keysManager, rateLimiters))
.addService(new ProfileGrpcService(clock, accountsManager, profilesManager, dynamicConfigurationManager,
config.getBadges(), asyncCdnS3Client, profileCdnPolicyGenerator, profileCdnPolicySigner, profileBadgeConverter, rateLimiters, zkProfileOperations, config.getCdnConfiguration().bucket()));
}
};
environment.lifecycle().manage(localEventLoopGroup);
environment.lifecycle().manage(anonymousGrpcServer);
environment.lifecycle().manage(authenticatedGrpcServer);
final List<Filter> filters = new ArrayList<>();
final RemoteDeprecationFilter remoteDeprecationFilter = new RemoteDeprecationFilter(dynamicConfigurationManager);
filters.add(remoteDeprecationFilter);
filters.add(new RemoteAddressFilter(useRemoteAddress));
@@ -776,19 +824,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
.addMappingForUrlPatterns(EnumSet.of(DispatcherType.REQUEST), false, "/*");
}
// Note: interceptors run in the reverse order they are added; the remote deprecation filter
// depends on the user-agent context so it has to come first here!
// http://grpc.github.io/grpc-java/javadoc/io/grpc/ServerBuilder.html#intercept-io.grpc.ServerInterceptor-
grpcServer
// TODO: specialize metrics with user-agent platform
.intercept(new MetricCollectingServerInterceptor(Metrics.globalRegistry))
.intercept(new ErrorMappingInterceptor())
.intercept(new AcceptLanguageInterceptor())
.intercept(remoteDeprecationFilter)
.intercept(new UserAgentInterceptor());
environment.lifecycle().manage(new GrpcServerManagedWrapper(grpcServer.build()));
final AuthFilter<BasicCredentials, AuthenticatedAccount> accountAuthFilter =
new BasicCredentialAuthFilter.Builder<AuthenticatedAccount>()
.setAuthenticator(accountAuthenticator)

View File

@@ -1,35 +0,0 @@
/*
* Copyright 2023 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.grpc;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import io.dropwizard.lifecycle.Managed;
import io.grpc.Server;
public class GrpcServerManagedWrapper implements Managed {
private final Server server;
public GrpcServerManagedWrapper(final Server server) {
this.server = server;
}
@Override
public void start() throws IOException {
server.start();
}
@Override
public void stop() {
try {
server.shutdown().awaitTermination(5, TimeUnit.MINUTES);
} catch (InterruptedException e) {
server.shutdownNow();
}
}
}

View File

@@ -0,0 +1,124 @@
package org.whispersystems.textsecuregcm.grpc.net;
import com.southernstorm.noise.protocol.HandshakeState;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.util.internal.EmptyArrays;
import java.security.NoSuchAlgorithmException;
import javax.crypto.BadPaddingException;
import javax.crypto.ShortBufferException;
import org.signal.libsignal.protocol.ecc.ECKeyPair;
/**
* An abstract base class for XX- and NX-patterned Noise responder handshake handlers.
*
* @see <a href="https://noiseprotocol.org/noise.html">The Noise Protocol Framework</a>
*/
abstract class AbstractNoiseHandshakeHandler extends ChannelInboundHandlerAdapter {
private final ECKeyPair ecKeyPair;
private final byte[] publicKeySignature;
private final HandshakeState handshakeState;
private static final int EXPECTED_EPHEMERAL_KEY_MESSAGE_LENGTH = 32;
/**
* Constructs a new Noise handler with the given static server keys and static public key signature. The static public
* key must be signed by a trusted root private key whose public key is known to and trusted by authenticating
* clients.
*
* @param noiseProtocolName the name of the Noise protocol implemented by this handshake handler
* @param ecKeyPair the static key pair for this server
* @param publicKeySignature an Ed25519 signature of the raw bytes of the static public key
*/
AbstractNoiseHandshakeHandler(final String noiseProtocolName,
final ECKeyPair ecKeyPair,
final byte[] publicKeySignature) {
this.ecKeyPair = ecKeyPair;
this.publicKeySignature = publicKeySignature;
try {
this.handshakeState = new HandshakeState(noiseProtocolName, HandshakeState.RESPONDER);
} catch (final NoSuchAlgorithmException e) {
throw new AssertionError("Unsupported Noise algorithm: " + noiseProtocolName, e);
}
}
protected HandshakeState getHandshakeState() {
return handshakeState;
}
/**
* Handles an initial ephemeral key message from a client, advancing the handshake state and sending the server's
* static keys to the client. Both XX and NX patterns begin with a client sending its ephemeral key to the server.
* Clients must not include an additional payload with their ephemeral key message. The server's reply contains its
* static keys along with an Ed25519 signature of its public static key by a trusted root key.
*
* @param context the channel handler context for this message
* @param frame the websocket frame containing the ephemeral key message
*
* @throws NoiseHandshakeException if the ephemeral key message from the client was not of the expected size or if a
* general Noise encryption error occurred
*/
protected void handleEphemeralKeyMessage(final ChannelHandlerContext context, final BinaryWebSocketFrame frame)
throws NoiseHandshakeException {
if (frame.content().readableBytes() != EXPECTED_EPHEMERAL_KEY_MESSAGE_LENGTH) {
throw new NoiseHandshakeException("Unexpected ephemeral key message length");
}
// Cryptographically initializing a handshake is expensive, and so we defer it until we're confident the client is
// making a good-faith effort to perform a handshake (i.e. now). Noise-java in particular will derive a public key
// from the supplied private key (and will in fact overwrite any previously-set public key when setting a private
// key), so we just set the private key here.
handshakeState.getLocalKeyPair().setPrivateKey(ecKeyPair.getPrivateKey().serialize(), 0);
handshakeState.start();
// The initial message from the client should just include a plaintext ephemeral key with no payload. The frame is
// coming off the wire and so will be in a direct buffer that doesn't have a backing array.
final byte[] ephemeralKeyMessage = ByteBufUtil.getBytes(frame.content());
frame.content().readBytes(ephemeralKeyMessage);
try {
handshakeState.readMessage(ephemeralKeyMessage, 0, ephemeralKeyMessage.length, EmptyArrays.EMPTY_BYTES, 0);
} catch (final ShortBufferException e) {
// This should never happen since we're checking the length of the frame up front
throw new NoiseHandshakeException("Unexpected client payload");
} catch (final BadPaddingException e) {
// It turns out this should basically never happen because (a) we're not using padding and (b) the "bad AEAD tag"
// subclass of a bad padding exception can only happen if we have some AD to check, which we don't for an
// ephemeral-key-only message
throw new NoiseHandshakeException("Invalid keys");
}
// Send our key material and public key signature back to the client; this buffer will include:
//
// - A 32-byte plaintext ephemeral key
// - A 32-byte encrypted static key
// - A 16-byte AEAD tag for the static key
// - The public key signature payload
// - A 16-byte AEAD tag for the payload
final byte[] keyMaterial = new byte[32 + 32 + 16 + publicKeySignature.length + 16];
try {
handshakeState.writeMessage(keyMaterial, 0, publicKeySignature, 0, publicKeySignature.length);
context.writeAndFlush(new BinaryWebSocketFrame(Unpooled.wrappedBuffer(keyMaterial)))
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
} catch (final ShortBufferException e) {
// This should never happen for messages of known length that we control
throw new AssertionError("Key material buffer was too short for message", e);
}
}
@Override
public void handlerRemoved(final ChannelHandlerContext context) {
handshakeState.destroy();
}
}

View File

@@ -0,0 +1,24 @@
package org.whispersystems.textsecuregcm.grpc.net;
import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
enum ApplicationWebSocketCloseReason {
NOISE_HANDSHAKE_ERROR(4001),
CLIENT_AUTHENTICATION_ERROR(4002),
NOISE_ENCRYPTION_ERROR(4003),
REAUTHENTICATION_REQUIRED(4004);
private final int statusCode;
ApplicationWebSocketCloseReason(final int statusCode) {
this.statusCode = statusCode;
}
public int getStatusCode() {
return statusCode;
}
WebSocketCloseStatus toWebSocketCloseStatus(final String reason) {
return new WebSocketCloseStatus(statusCode, reason);
}
}

View File

@@ -0,0 +1,7 @@
package org.whispersystems.textsecuregcm.grpc.net;
/**
* Indicates that an attempt to authenticate a remote client failed for some reason.
*/
class ClientAuthenticationException extends Exception {
}

View File

@@ -0,0 +1,59 @@
package org.whispersystems.textsecuregcm.grpc.net;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import javax.crypto.BadPaddingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An error handler serves as a general backstop for exceptions elsewhere in the pipeline. If the client has completed a
* WebSocket handshake, the error handler will send appropriate WebSocket closure codes to the client in an attempt to
* identify the problem. If the client has not completed a WebSocket handshake, the handler simply closes the
* connection.
*/
class ErrorHandler extends ChannelInboundHandlerAdapter {
private boolean websocketHandshakeComplete = false;
private static final Logger log = LoggerFactory.getLogger(ErrorHandler.class);
@Override
public void userEventTriggered(final ChannelHandlerContext context, final Object event) throws Exception {
if (event instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
setWebsocketHandshakeComplete();
}
context.fireUserEventTriggered(event);
}
protected void setWebsocketHandshakeComplete() {
this.websocketHandshakeComplete = true;
}
@Override
public void exceptionCaught(final ChannelHandlerContext context, final Throwable cause) {
if (websocketHandshakeComplete) {
final WebSocketCloseStatus webSocketCloseStatus = switch (cause) {
case NoiseHandshakeException e -> ApplicationWebSocketCloseReason.NOISE_HANDSHAKE_ERROR.toWebSocketCloseStatus(e.getMessage());
case ClientAuthenticationException ignored -> ApplicationWebSocketCloseReason.CLIENT_AUTHENTICATION_ERROR.toWebSocketCloseStatus("Not authenticated");
case BadPaddingException ignored -> ApplicationWebSocketCloseReason.NOISE_ENCRYPTION_ERROR.toWebSocketCloseStatus("Noise encryption error");
default -> {
log.warn("An unexpected exception reached the end of the pipeline", cause);
yield WebSocketCloseStatus.INTERNAL_SERVER_ERROR;
}
};
context.writeAndFlush(new CloseWebSocketFrame(webSocketCloseStatus))
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
// We haven't completed a websocket handshake, so we can't really communicate errors in a semantically-meaningful
// way; just close the connection instead.
context.close();
}
}
}

View File

@@ -0,0 +1,96 @@
package org.whispersystems.textsecuregcm.grpc.net;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.netty.util.ReferenceCountUtil;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An "establish local connection" handler waits for a Noise handshake to complete upstream in the pipeline, buffering
* any inbound messages until the connection is fully-established, and then opens a proxy connection to a local gRPC
* server.
*/
class EstablishLocalGrpcConnectionHandler extends ChannelInboundHandlerAdapter {
private final LocalAddress authenticatedGrpcServerAddress;
private final LocalAddress anonymousGrpcServerAddress;
private final List<Object> pendingReads = new ArrayList<>();
private static final Logger log = LoggerFactory.getLogger(EstablishLocalGrpcConnectionHandler.class);
public EstablishLocalGrpcConnectionHandler(final LocalAddress authenticatedGrpcServerAddress,
final LocalAddress anonymousGrpcServerAddress) {
this.authenticatedGrpcServerAddress = authenticatedGrpcServerAddress;
this.anonymousGrpcServerAddress = anonymousGrpcServerAddress;
}
@Override
public void channelRead(final ChannelHandlerContext context, final Object message) {
pendingReads.add(message);
}
@Override
public void userEventTriggered(final ChannelHandlerContext remoteChannelContext, final Object event) throws Exception {
if (event instanceof NoiseHandshakeCompleteEvent noiseHandshakeCompleteEvent) {
// We assume that we'll only get a completed handshake event if the handshake met all authentication requirements
// for the requested service. If the handshake doesn't have an authenticated device, we assume we're trying to
// connect to the anonymous service. If it does have an authenticated device, we assume we're aiming for the
// authenticated service.
final LocalAddress grpcServerAddress = noiseHandshakeCompleteEvent.authenticatedDevice().isPresent()
? authenticatedGrpcServerAddress
: anonymousGrpcServerAddress;
new Bootstrap()
.remoteAddress(grpcServerAddress)
// TODO Set local address
.channel(LocalChannel.class)
.group(remoteChannelContext.channel().eventLoop())
.handler(new ChannelInitializer<LocalChannel>() {
@Override
protected void initChannel(final LocalChannel localChannel) {
localChannel.pipeline().addLast(new ProxyHandler(remoteChannelContext.channel()));
}
})
.connect()
.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
// Close the local connection if the remote channel closes and vice versa
remoteChannelContext.channel().closeFuture().addListener(closeFuture -> future.channel().close());
future.channel().closeFuture().addListener(closeFuture ->
remoteChannelContext.write(new CloseWebSocketFrame(WebSocketCloseStatus.SERVICE_RESTART)));
remoteChannelContext.pipeline()
.addAfter(remoteChannelContext.name(), null, new ProxyHandler(future.channel()));
// Flush any buffered reads we accumulated while waiting to open the connection
pendingReads.forEach(remoteChannelContext::fireChannelRead);
pendingReads.clear();
remoteChannelContext.pipeline().remove(EstablishLocalGrpcConnectionHandler.this);
} else {
log.warn("Failed to establish local connection to gRPC server", future.cause());
remoteChannelContext.close();
}
});
}
remoteChannelContext.fireUserEventTriggered(event);
}
@Override
public void handlerRemoved(final ChannelHandlerContext context) {
pendingReads.forEach(ReferenceCountUtil::release);
pendingReads.clear();
}
}

View File

@@ -0,0 +1,16 @@
package org.whispersystems.textsecuregcm.grpc.net;
import io.dropwizard.lifecycle.Managed;
import io.netty.channel.DefaultEventLoopGroup;
/**
* A wrapper for a Netty {@link DefaultEventLoopGroup} that implements Dropwizard's {@link Managed} interface, allowing
* Dropwizard to manage the lifecycle of the event loop group.
*/
public class ManagedDefaultEventLoopGroup extends DefaultEventLoopGroup implements Managed {
@Override
public void stop() throws InterruptedException {
this.shutdownGracefully().await();
}
}

View File

@@ -0,0 +1,49 @@
package org.whispersystems.textsecuregcm.grpc.net;
import io.dropwizard.lifecycle.Managed;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.netty.NettyServerBuilder;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalServerChannel;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* A managed, local gRPC server configures and wraps a gRPC {@link Server} that listens on a Netty {@link LocalAddress}
* and whose lifecycle is managed by Dropwizard via the {@link Managed} interface.
*/
public abstract class ManagedLocalGrpcServer implements Managed {
private final Server server;
public ManagedLocalGrpcServer(final LocalAddress localAddress,
final DefaultEventLoopGroup eventLoopGroup) {
final ServerBuilder<?> serverBuilder = NettyServerBuilder.forAddress(localAddress)
.channelType(LocalServerChannel.class)
.bossEventLoopGroup(eventLoopGroup)
.workerEventLoopGroup(eventLoopGroup);
configureServer(serverBuilder);
server = serverBuilder.build();
}
protected abstract void configureServer(final ServerBuilder<?> serverBuilder);
@Override
public void start() throws IOException {
server.start();
}
@Override
public void stop() {
try {
server.shutdown().awaitTermination(5, TimeUnit.MINUTES);
} catch (final InterruptedException e) {
server.shutdownNow();
}
}
}

View File

@@ -0,0 +1,16 @@
package org.whispersystems.textsecuregcm.grpc.net;
import io.dropwizard.lifecycle.Managed;
import io.netty.channel.nio.NioEventLoopGroup;
/**
* A wrapper for a Netty {@link NioEventLoopGroup} that implements Dropwizard's {@link Managed} interface, allowing
* Dropwizard to manage the lifecycle of the event loop group.
*/
public class ManagedNioEventLoopGroup extends NioEventLoopGroup implements Managed {
@Override
public void stop() throws Exception {
this.shutdownGracefully().await();
}
}

View File

@@ -0,0 +1,13 @@
package org.whispersystems.textsecuregcm.grpc.net;
import org.whispersystems.textsecuregcm.auth.grpc.AuthenticatedDevice;
import java.util.Optional;
/**
* An event that indicates that a Noise handshake has completed, possibly authenticating a caller in the process.
*
* @param authenticatedDevice the device authenticated as part of the handshake, or empty if the handshake was not of a
* type that performs authentication
*/
record NoiseHandshakeCompleteEvent(Optional<AuthenticatedDevice> authenticatedDevice) {
}

View File

@@ -0,0 +1,12 @@
package org.whispersystems.textsecuregcm.grpc.net;
/**
* Indicates that some problem occurred while completing a Noise handshake (e.g. an unexpected message size/format or
* a general encryption error).
*/
class NoiseHandshakeException extends Exception {
public NoiseHandshakeException(final String message) {
super(message);
}
}

View File

@@ -0,0 +1,40 @@
package org.whispersystems.textsecuregcm.grpc.net;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import java.util.Optional;
import io.netty.util.ReferenceCountUtil;
import org.signal.libsignal.protocol.ecc.ECKeyPair;
/**
* A Noise NX handler handles the responder side of a Noise NX handshake.
*/
class NoiseNXHandshakeHandler extends AbstractNoiseHandshakeHandler {
static final String NOISE_PROTOCOL_NAME = "Noise_NX_25519_ChaChaPoly_BLAKE2b";
NoiseNXHandshakeHandler(final ECKeyPair ecKeyPair, final byte[] publicKeySignature) {
super(NOISE_PROTOCOL_NAME, ecKeyPair, publicKeySignature);
}
@Override
public void channelRead(final ChannelHandlerContext context, final Object message) throws Exception {
if (message instanceof BinaryWebSocketFrame frame) {
try {
handleEphemeralKeyMessage(context, frame);
} finally {
frame.release();
}
// All we need to do is accept the client's ephemeral key and send our own static keys; after that, we can consider
// the handshake complete
context.fireUserEventTriggered(new NoiseHandshakeCompleteEvent(Optional.empty()));
context.pipeline().replace(NoiseNXHandshakeHandler.this, null, new NoiseStreamHandler(getHandshakeState().split()));
} else {
// Anything except binary WebSocket frames should have been filtered out of the pipeline by now; treat this as an
// error
ReferenceCountUtil.release(message);
throw new IllegalArgumentException("Unexpected message in pipeline: " + message);
}
}
}

View File

@@ -0,0 +1,95 @@
package org.whispersystems.textsecuregcm.grpc.net;
import com.southernstorm.noise.protocol.CipherState;
import com.southernstorm.noise.protocol.CipherStatePair;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.util.ReferenceCountUtil;
import javax.crypto.BadPaddingException;
import javax.crypto.ShortBufferException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A Noise stream handler manages a bidirectional Noise session after a handshake has completed.
*/
class NoiseStreamHandler extends ChannelDuplexHandler {
private final CipherStatePair cipherStatePair;
private static final Logger log = LoggerFactory.getLogger(NoiseStreamHandler.class);
NoiseStreamHandler(CipherStatePair cipherStatePair) {
this.cipherStatePair = cipherStatePair;
}
@Override
public void channelRead(final ChannelHandlerContext context, final Object message)
throws ShortBufferException, BadPaddingException {
if (message instanceof BinaryWebSocketFrame frame) {
try {
final CipherState cipherState = cipherStatePair.getReceiver();
// We've read this frame off the wire, and so it's most likely a direct buffer that's not backed by an array.
// We'll need to copy it to a heap buffer.
final byte[] noiseBuffer = ByteBufUtil.getBytes(frame.content());
// Overwrite the ciphertext with the plaintext to avoid an extra allocation for a dedicated plaintext buffer
final int plaintextLength = cipherState.decryptWithAd(null, noiseBuffer, 0, noiseBuffer, 0, noiseBuffer.length);
context.fireChannelRead(Unpooled.wrappedBuffer(noiseBuffer, 0, plaintextLength));
} finally {
frame.release();
}
} else {
// Anything except binary WebSocket frames should have been filtered out of the pipeline by now; treat this as an
// error
ReferenceCountUtil.release(message);
throw new IllegalArgumentException("Unexpected message in pipeline: " + message);
}
}
@Override
public void write(final ChannelHandlerContext context, final Object message, final ChannelPromise promise) throws Exception {
if (message instanceof ByteBuf plaintext) {
try {
// TODO Buffer/consolidate Noise writes to avoid sending a bazillion tiny (or empty) frames
final CipherState cipherState = cipherStatePair.getSender();
final int plaintextLength = plaintext.readableBytes();
// We've read these bytes from a local connection; although that likely means they're backed by a heap array, the
// buffer is read-only and won't grant us access to the underlying array. Instead, we need to copy the bytes to a
// mutable array. We also want to encrypt in place, so we allocate enough extra space for the trailing MAC.
final byte[] noiseBuffer = new byte[plaintext.readableBytes() + cipherState.getMACLength()];
plaintext.readBytes(noiseBuffer, 0, plaintext.readableBytes());
// Overwrite the plaintext with the ciphertext to avoid an extra allocation for a dedicated ciphertext buffer
cipherState.encryptWithAd(null, noiseBuffer, 0, noiseBuffer, 0, plaintextLength);
context.write(new BinaryWebSocketFrame(Unpooled.wrappedBuffer(noiseBuffer)), promise);
} finally {
plaintext.release();
}
} else {
if (!(message instanceof WebSocketFrame)) {
// Downstream handlers may write WebSocket frames that don't need to be encrypted (e.g. "close" frames that
// get issued in response to exceptions)
log.warn("Unexpected object in pipeline: {}", message);
}
context.write(message, promise);
}
}
@Override
public void handlerRemoved(final ChannelHandlerContext context) {
cipherStatePair.destroy();
}
}

View File

@@ -0,0 +1,178 @@
package org.whispersystems.textsecuregcm.grpc.net;
import com.southernstorm.noise.protocol.HandshakeState;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.util.ReferenceCountUtil;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import javax.crypto.BadPaddingException;
import javax.crypto.ShortBufferException;
import org.signal.libsignal.protocol.ecc.ECKeyPair;
import org.whispersystems.textsecuregcm.auth.grpc.AuthenticatedDevice;
import org.whispersystems.textsecuregcm.storage.ClientPublicKeysManager;
import org.whispersystems.textsecuregcm.util.UUIDUtil;
/**
* A Noise XX handler handles the responder side of a Noise XX handshake. This implementation expects clients to send
* identifying information (an account identifier and device ID) as an additional payload when sending its static key
* material. It compares the static public key against the stored public key for the identified device asynchronously,
* buffering traffic from the client until the authentication check completes.
*/
class NoiseXXHandshakeHandler extends AbstractNoiseHandshakeHandler {
private final ClientPublicKeysManager clientPublicKeysManager;
private AuthenticationState authenticationState = AuthenticationState.GET_EPHEMERAL_KEY;
private final List<BinaryWebSocketFrame> pendingInboundFrames = new ArrayList<>();
static final String NOISE_PROTOCOL_NAME = "Noise_XX_25519_ChaChaPoly_BLAKE2b";
// When the client sends its static key message, we expect:
//
// - A 32-byte encrypted static public key
// - A 16-byte AEAD tag for the static key
// - 17 bytes of identity data in the message payload (a UUID and a one-byte device ID)
// - A 16-byte AEAD tag for the identity payload
private static final int EXPECTED_CLIENT_STATIC_KEY_MESSAGE_LENGTH = 81;
private enum AuthenticationState {
GET_EPHEMERAL_KEY,
GET_STATIC_KEY,
CHECK_PUBLIC_KEY,
ERROR
}
public NoiseXXHandshakeHandler(final ClientPublicKeysManager clientPublicKeysManager,
final ECKeyPair ecKeyPair,
final byte[] publicKeySignature) {
super(NOISE_PROTOCOL_NAME, ecKeyPair, publicKeySignature);
this.clientPublicKeysManager = clientPublicKeysManager;
}
@Override
public void channelRead(final ChannelHandlerContext context, final Object message) throws Exception {
if (message instanceof BinaryWebSocketFrame frame) {
try {
switch (authenticationState) {
case GET_EPHEMERAL_KEY -> {
try {
handleEphemeralKeyMessage(context, frame);
authenticationState = AuthenticationState.GET_STATIC_KEY;
} finally {
frame.release();
}
}
case GET_STATIC_KEY -> {
try {
handleStaticKey(context, frame);
authenticationState = AuthenticationState.CHECK_PUBLIC_KEY;
} finally {
frame.release();
}
}
case CHECK_PUBLIC_KEY -> {
// Buffer any inbound traffic until we've finished checking the client's public key
pendingInboundFrames.add(frame);
}
case ERROR -> {
// If authentication has failed for any reason, just discard inbound traffic until the channel closes
frame.release();
}
}
} catch (final ShortBufferException e) {
authenticationState = AuthenticationState.ERROR;
throw new NoiseHandshakeException("Unexpected payload length");
} catch (final BadPaddingException e) {
authenticationState = AuthenticationState.ERROR;
throw new ClientAuthenticationException();
}
} else {
// Anything except binary WebSocket frames should have been filtered out of the pipeline by now; treat this as an
// error
ReferenceCountUtil.release(message);
throw new IllegalArgumentException("Unexpected message in pipeline: " + message);
}
}
private void handleStaticKey(final ChannelHandlerContext context, final BinaryWebSocketFrame frame)
throws NoiseHandshakeException, ShortBufferException, BadPaddingException {
if (frame.content().readableBytes() != EXPECTED_CLIENT_STATIC_KEY_MESSAGE_LENGTH) {
throw new NoiseHandshakeException("Unexpected client static key message length");
}
final HandshakeState handshakeState = getHandshakeState();
// The websocket frame will have come right off the wire, and so needs to be copied from a non-array-backed direct
// buffer into a heap buffer.
final byte[] staticKeyAndClientIdentityMessage = ByteBufUtil.getBytes(frame.content());
// The payload from the client should be a UUID (16 bytes) followed by a device ID (1 byte)
final byte[] payload = new byte[17];
final UUID accountIdentifier;
final byte deviceId;
final int payloadBytesRead = handshakeState.readMessage(staticKeyAndClientIdentityMessage,
0, staticKeyAndClientIdentityMessage.length, payload, 0);
if (payloadBytesRead != 17) {
throw new NoiseHandshakeException("Unexpected identity payload length");
}
try {
accountIdentifier = UUIDUtil.fromBytes(payload, 0);
} catch (final IllegalArgumentException e) {
throw new NoiseHandshakeException("Could not parse account identifier");
}
deviceId = payload[16];
// Verify the identity of the caller by comparing the submitted static public key against the stored public key for
// the identified device
clientPublicKeysManager.findPublicKey(accountIdentifier, deviceId)
.whenCompleteAsync((maybePublicKey, throwable) -> maybePublicKey.ifPresentOrElse(storedPublicKey -> {
final byte[] publicKeyFromClient = new byte[handshakeState.getRemotePublicKey().getPublicKeyLength()];
handshakeState.getRemotePublicKey().getPublicKey(publicKeyFromClient, 0);
if (MessageDigest.isEqual(publicKeyFromClient, storedPublicKey.getPublicKeyBytes())) {
context.fireUserEventTriggered(new NoiseHandshakeCompleteEvent(
Optional.of(new AuthenticatedDevice(accountIdentifier, deviceId))));
context.pipeline().addAfter(context.name(), null, new NoiseStreamHandler(handshakeState.split()));
// Flush any buffered reads
pendingInboundFrames.forEach(context::fireChannelRead);
pendingInboundFrames.clear();
context.pipeline().remove(NoiseXXHandshakeHandler.this);
} else {
// We found a key, but it doesn't match what the caller submitted
context.fireExceptionCaught(new ClientAuthenticationException());
authenticationState = AuthenticationState.ERROR;
}
},
() -> {
// We couldn't find a key for the identified account/device
context.fireExceptionCaught(new ClientAuthenticationException());
authenticationState = AuthenticationState.ERROR;
}),
context.executor());
}
@Override
public void handlerRemoved(final ChannelHandlerContext context) {
super.handlerRemoved(context);
pendingInboundFrames.forEach(BinaryWebSocketFrame::release);
pendingInboundFrames.clear();
}
}

View File

@@ -0,0 +1,24 @@
package org.whispersystems.textsecuregcm.grpc.net;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* A proxy handler writes all data read from one channel to another peer channel.
*/
class ProxyHandler extends ChannelInboundHandlerAdapter {
private final Channel peerChannel;
public ProxyHandler(final Channel peerChannel) {
this.peerChannel = peerChannel;
}
@Override
public void channelRead(final ChannelHandlerContext context, final Object message) {
peerChannel.writeAndFlush(message)
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
}

View File

@@ -0,0 +1,35 @@
package org.whispersystems.textsecuregcm.grpc.net;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.util.ReferenceCountUtil;
/**
* A "reject unsupported message" handler closes the channel if it receives messages it does not know how to process.
*/
public class RejectUnsupportedMessagesHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(final ChannelHandlerContext context, final Object message) throws Exception {
if (message instanceof final WebSocketFrame webSocketFrame) {
if (webSocketFrame instanceof final TextWebSocketFrame textWebSocketFrame) {
try {
context.writeAndFlush(new CloseWebSocketFrame(WebSocketCloseStatus.INVALID_MESSAGE_TYPE));
} finally {
textWebSocketFrame.release();
}
} else {
// Allow all other types of WebSocket frames
context.fireChannelRead(webSocketFrame);
}
} else {
// Discard anything that's not a WebSocket frame
ReferenceCountUtil.release(message);
context.writeAndFlush(new CloseWebSocketFrame(WebSocketCloseStatus.INVALID_MESSAGE_TYPE));
}
}
}

View File

@@ -0,0 +1,74 @@
package org.whispersystems.textsecuregcm.grpc.net;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.util.ReferenceCountUtil;
/**
* A WebSocket opening handshake handler serves as the "front door" for the WebSocket/Noise tunnel and gracefully
* rejects requests for anything other than a WebSocket connection to a known endpoint.
*/
class WebSocketOpeningHandshakeHandler extends ChannelInboundHandlerAdapter {
private final String authenticatedPath;
private final String anonymousPath;
WebSocketOpeningHandshakeHandler(final String authenticatedPath, final String anonymousPath) {
this.authenticatedPath = authenticatedPath;
this.anonymousPath = anonymousPath;
}
@Override
public void channelRead(final ChannelHandlerContext context, final Object message) throws Exception {
if (message instanceof FullHttpRequest request) {
boolean shouldReleaseRequest = true;
try {
if (request.decoderResult().isSuccess()) {
if (HttpMethod.GET.equals(request.method())) {
if (authenticatedPath.equals(request.uri()) || anonymousPath.equals(request.uri())) {
if (request.headers().contains(HttpHeaderNames.UPGRADE, HttpHeaderValues.WEBSOCKET, true)) {
// Pass the request along to the websocket handshake handler and remove ourselves from the pipeline
shouldReleaseRequest = false;
context.fireChannelRead(request);
context.pipeline().remove(this);
} else {
closeConnectionWithStatus(context, request, HttpResponseStatus.UPGRADE_REQUIRED);
}
} else {
closeConnectionWithStatus(context, request, HttpResponseStatus.NOT_FOUND);
}
} else {
closeConnectionWithStatus(context, request, HttpResponseStatus.METHOD_NOT_ALLOWED);
}
} else {
closeConnectionWithStatus(context, request, HttpResponseStatus.BAD_REQUEST);
}
} finally {
if (shouldReleaseRequest) {
request.release();
}
}
} else {
// Anything except HTTP requests should have been filtered out of the pipeline by now; treat this as an error
ReferenceCountUtil.release(message);
throw new IllegalArgumentException("Unexpected message in pipeline: " + message);
}
}
private static void closeConnectionWithStatus(final ChannelHandlerContext context,
final FullHttpRequest request,
final HttpResponseStatus status) {
context.writeAndFlush(new DefaultFullHttpResponse(request.protocolVersion(), status))
.addListener(ChannelFutureListener.CLOSE);
}
}

View File

@@ -0,0 +1,52 @@
package org.whispersystems.textsecuregcm.grpc.net;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import org.signal.libsignal.protocol.ecc.ECKeyPair;
import org.whispersystems.textsecuregcm.storage.ClientPublicKeysManager;
/**
* A WebSocket handshake listener waits for a WebSocket handshake to complete, then replaces itself with the appropriate
* Noise handshake handler for the requested path.
*/
class WebsocketHandshakeCompleteListener extends ChannelInboundHandlerAdapter {
private final ClientPublicKeysManager clientPublicKeysManager;
private final ECKeyPair ecKeyPair;
private final byte[] publicKeySignature;
WebsocketHandshakeCompleteListener(final ClientPublicKeysManager clientPublicKeysManager,
final ECKeyPair ecKeyPair,
final byte[] publicKeySignature) {
this.clientPublicKeysManager = clientPublicKeysManager;
this.ecKeyPair = ecKeyPair;
this.publicKeySignature = publicKeySignature;
}
@Override
public void userEventTriggered(final ChannelHandlerContext context, final Object event) {
if (event instanceof WebSocketServerProtocolHandler.HandshakeComplete handshakeCompleteEvent) {
final ChannelHandler noiseHandshakeHandler = switch (handshakeCompleteEvent.requestUri()) {
case WebsocketNoiseTunnelServer.AUTHENTICATED_SERVICE_PATH ->
new NoiseXXHandshakeHandler(clientPublicKeysManager, ecKeyPair, publicKeySignature);
case WebsocketNoiseTunnelServer.ANONYMOUS_SERVICE_PATH ->
new NoiseNXHandshakeHandler(ecKeyPair, publicKeySignature);
default -> {
// The HttpHandler should have caught all of these cases already; we'll consider it an internal error if
// something slipped through.
throw new IllegalArgumentException("Unexpected URI: " + handshakeCompleteEvent.requestUri());
}
};
context.pipeline().replace(WebsocketHandshakeCompleteListener.this, null, noiseHandshakeHandler);
}
context.fireUserEventTriggered(event);
}
}

View File

@@ -0,0 +1,116 @@
package org.whispersystems.textsecuregcm.grpc.net;
import com.google.common.annotations.VisibleForTesting;
import com.southernstorm.noise.protocol.Noise;
import io.dropwizard.lifecycle.Managed;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.OpenSsl;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslProtocols;
import io.netty.handler.ssl.SslProvider;
import java.net.InetSocketAddress;
import java.security.PrivateKey;
import java.security.cert.X509Certificate;
import java.util.concurrent.Executor;
import javax.net.ssl.SSLException;
import org.signal.libsignal.protocol.ecc.ECKeyPair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.storage.ClientPublicKeysManager;
/**
* A WebSocket/Noise tunnel server accepts traffic from the public internet (in the form of Noise packets framed by
* binary WebSocket frames) and passes it through to a local gRPC server.
*/
public class WebsocketNoiseTunnelServer implements Managed {
private final ServerBootstrap bootstrap;
private ServerSocketChannel channel;
static final String AUTHENTICATED_SERVICE_PATH = "/authenticated";
static final String ANONYMOUS_SERVICE_PATH = "/anonymous";
private static final Logger log = LoggerFactory.getLogger(WebsocketNoiseTunnelServer.class);
public WebsocketNoiseTunnelServer(final int websocketPort,
final X509Certificate[] tlsCertificateChain,
final PrivateKey tlsPrivateKey,
final NioEventLoopGroup eventLoopGroup,
final Executor delegatedTaskExecutor,
final ClientPublicKeysManager clientPublicKeysManager,
final ECKeyPair ecKeyPair,
final byte[] publicKeySignature,
final LocalAddress authenticatedGrpcServerAddress,
final LocalAddress anonymousGrpcServerAddress) throws SSLException {
final SslProvider sslProvider;
if (OpenSsl.isAvailable()) {
log.info("Native OpenSSL provider is available; will use native provider");
sslProvider = SslProvider.OPENSSL;
} else {
log.info("No native SSL provider available; will use JDK provider");
sslProvider = SslProvider.JDK;
}
final SslContext sslContext = SslContextBuilder.forServer(tlsPrivateKey, tlsCertificateChain)
.clientAuth(ClientAuth.NONE)
.protocols(SslProtocols.TLS_v1_3)
.sslProvider(sslProvider)
.build();
this.bootstrap = new ServerBootstrap()
.group(eventLoopGroup)
.channel(NioServerSocketChannel.class)
.localAddress(websocketPort)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline()
.addLast(sslContext.newHandler(socketChannel.alloc(), delegatedTaskExecutor))
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(Noise.MAX_PACKET_LEN))
// The WebSocket opening handshake handler will remove itself from the pipeline once it has received a valid WebSocket upgrade
// request and passed it down the pipeline
.addLast(new WebSocketOpeningHandshakeHandler(AUTHENTICATED_SERVICE_PATH, ANONYMOUS_SERVICE_PATH))
.addLast(new WebSocketServerProtocolHandler("/", true))
.addLast(new RejectUnsupportedMessagesHandler())
// The WebSocket handshake complete listener will replace itself with an appropriate Noise handshake handler once
// a WebSocket handshake has been completed
.addLast(new WebsocketHandshakeCompleteListener(clientPublicKeysManager, ecKeyPair, publicKeySignature))
// This handler will open a local connection to the appropriate gRPC server and install a ProxyHandler
// once the Noise handshake has completed
.addLast(new EstablishLocalGrpcConnectionHandler(authenticatedGrpcServerAddress, anonymousGrpcServerAddress))
.addLast(new ErrorHandler());
}
});
}
@VisibleForTesting
InetSocketAddress getLocalAddress() {
return channel.localAddress();
}
@Override
public void start() throws InterruptedException {
channel = (ServerSocketChannel) bootstrap.bind().await().channel();
}
@Override
public void stop() throws InterruptedException {
if (channel != null) {
channel.close().await();
}
}
}

View File

@@ -8,7 +8,6 @@ package org.whispersystems.textsecuregcm.util;
import com.google.protobuf.ByteString;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.UUID;
public final class UUIDUtil {
@@ -40,6 +39,10 @@ public final class UUIDUtil {
return fromByteBuffer(ByteBuffer.wrap(bytes));
}
public static UUID fromBytes(final byte[] bytes, final int offset) {
return fromByteBuffer(ByteBuffer.wrap(bytes, offset, 16));
}
public static UUID fromByteBuffer(final ByteBuffer byteBuffer) {
try {
final long mostSigBits = byteBuffer.getLong();
@@ -52,12 +55,4 @@ public final class UUIDUtil {
throw new IllegalArgumentException("unexpected byte array length; was less than 16");
}
}
public static Optional<UUID> fromStringSafe(final String uuidString) {
try {
return Optional.of(UUID.fromString(uuidString));
} catch (final IllegalArgumentException e) {
return Optional.empty();
}
}
}