Add h2 omnibus server

This commit is contained in:
Ravi Khadiwala
2026-04-08 18:01:54 -05:00
committed by ravi-signal
parent 8d0ad7b085
commit 0beeb8a935
32 changed files with 2009 additions and 75 deletions
@@ -13,27 +13,28 @@ import io.dropwizard.testing.ConfigOverride;
import io.dropwizard.testing.junit5.DropwizardAppExtension;
import io.dropwizard.testing.junit5.DropwizardExtensionsSupport;
import io.dropwizard.util.Resources;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.MetadataUtils;
import jakarta.ws.rs.client.Client;
import jakarta.ws.rs.core.Response;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.ServerSocket;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -41,14 +42,18 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.whispersystems.textsecuregcm.configuration.OpenTelemetryConfiguration;
import org.signal.chat.account.AccountsAnonymousGrpc;
import org.signal.chat.account.CheckAccountExistenceRequest;
import org.signal.chat.account.CheckAccountExistenceResponse;
import org.signal.chat.common.IdentityType;
import org.signal.chat.common.ServiceIdentifier;
import org.whispersystems.textsecuregcm.metrics.NoopAwsSdkMetricPublisher;
import org.whispersystems.textsecuregcm.storage.DynamoDbExtension;
import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema;
import org.whispersystems.textsecuregcm.tests.util.TestWebsocketListener;
import org.whispersystems.textsecuregcm.util.AttributeValues;
import org.whispersystems.textsecuregcm.util.HeaderUtils;
import org.whispersystems.textsecuregcm.util.Util;
import org.whispersystems.textsecuregcm.util.UUIDUtil;
import org.whispersystems.websocket.messages.WebSocketResponseMessage;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
@@ -65,6 +70,7 @@ class WhisperServerServiceTest {
System.setProperty("secrets.bundle.filename",
Resources.getResource("config/test-secrets-bundle.yml").getPath());
}
private static final int OMNIBUS_PORT = findAvailablePort();
private static WebSocketClient webSocketClient;
private static WebSocketClient h2WebSocketClient;
@@ -72,7 +78,8 @@ class WhisperServerServiceTest {
private static final DropwizardAppExtension<WhisperServerConfiguration> EXTENSION = new DropwizardAppExtension<>(
WhisperServerService.class, Resources.getResource("config/test.yml").getPath(),
// Tables will be created by the local DynamoDbExtension
ConfigOverride.config("dynamoDbClient.initTables", "false"));
ConfigOverride.config("dynamoDbClient.initTables", "false"),
ConfigOverride.config("grpc.port", String.valueOf(OMNIBUS_PORT)));
@RegisterExtension
public static final DynamoDbExtension DYNAMO_DB_EXTENSION = new DynamoDbExtension(DynamoDbExtensionSchema.Tables.values());
@@ -200,6 +207,51 @@ class WhisperServerServiceTest {
.build());
}
@Test
void omnibusWebsocket() throws Exception {
final HTTP2Client http2Client = new HTTP2Client(new ClientConnector());
final WebSocketClient h2WebSocketClient =
new WebSocketClient(new HttpClient(new HttpClientTransportOverHTTP2(http2Client)));
h2WebSocketClient.start();
final TestWebsocketListener testWebsocketListener = new TestWebsocketListener();
final Session session = h2WebSocketClient.connect(testWebsocketListener,
URI.create(String.format("ws://localhost:%d/v1/websocket/", OMNIBUS_PORT)))
.join();
final WebSocketResponseMessage keepAlive = testWebsocketListener.doGet("/v1/keepalive").join();
assertEquals(200, keepAlive.getStatus());
final WebSocketResponseMessage whoami = testWebsocketListener.doGet("/v1/accounts/whoami").join();
assertEquals(401, whoami.getStatus());
session.close();
h2WebSocketClient.stop();
}
@Test
void omnibusGrpc() throws Exception {
final ManagedChannel channel = NettyChannelBuilder.forAddress("localhost", OMNIBUS_PORT)
.usePlaintext()
.build();
final Metadata metadata = new Metadata();
metadata.put(Metadata.Key.of("X-Forwarded-For", Metadata.ASCII_STRING_MARSHALLER), "127.0.0.1");
final AccountsAnonymousGrpc.AccountsAnonymousBlockingStub stub = AccountsAnonymousGrpc
.newBlockingStub(channel)
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata));
final CheckAccountExistenceResponse response = stub.checkAccountExistence(
CheckAccountExistenceRequest.newBuilder()
.setServiceIdentifier(ServiceIdentifier.newBuilder()
.setIdentityType(IdentityType.IDENTITY_TYPE_ACI)
.setUuid(UUIDUtil.toByteString(UUID.randomUUID()))
.build())
.build());
assertFalse(response.getAccountExists());
channel.shutdownNow();
channel.awaitTermination(1, TimeUnit.SECONDS);
}
private static DynamoDbClient getDynamoDbClient() {
final AwsCredentialsProvider awsCredentialsProvider = EXTENSION.getConfiguration().getAwsCredentialsConfiguration()
.build();
@@ -208,4 +260,12 @@ class WhisperServerServiceTest {
.buildSyncClient(awsCredentialsProvider, new NoopAwsSdkMetricPublisher());
}
private static int findAvailablePort() {
try (ServerSocket socket = new ServerSocket(0)) {
return socket.getLocalPort();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}