Remove reactor-grpc as a dependency

This commit is contained in:
Jon Chambers
2026-02-24 22:56:07 -05:00
committed by Jon Chambers
parent 6d87b24ba3
commit 80893152c9
5 changed files with 72 additions and 117 deletions

View File

@@ -23,9 +23,7 @@ import org.junit.jupiter.api.Test;
import org.signal.chat.rpc.EchoRequest;
import org.signal.chat.rpc.EchoResponse;
import org.signal.chat.rpc.EchoServiceGrpc;
import org.signal.chat.rpc.ReactorEchoServiceGrpc;
import org.signal.chat.rpc.SimpleEchoServiceGrpc;
import reactor.core.publisher.Mono;
class ErrorMappingInterceptorTest {
@@ -34,7 +32,7 @@ class ErrorMappingInterceptorTest {
@BeforeEach
void setUp() throws Exception {
void setUp() {
channel = InProcessChannelBuilder.forName("ErrorMappingInterceptorTest")
.directExecutor()
.build();
@@ -70,43 +68,6 @@ class ErrorMappingInterceptorTest {
client.echo(EchoRequest.getDefaultInstance()));
}
@Test
public void includeDetailsReactiveGrpc() throws Exception {
final StatusRuntimeException e = StatusProto.toStatusRuntimeException(com.google.rpc.Status.newBuilder()
.setCode(Status.Code.INVALID_ARGUMENT.value())
.addDetails(Any.pack(ErrorInfo.newBuilder()
.setDomain("test")
.setReason("TEST")
.build()))
.build());
server = InProcessServerBuilder.forName("ErrorMappingInterceptorTest")
.directExecutor()
.addService(new ReactorEchoServiceErrorImpl(e))
.intercept(new ErrorMappingInterceptor())
.build()
.start();
final EchoServiceGrpc.EchoServiceBlockingStub client = EchoServiceGrpc.newBlockingStub(channel);
GrpcTestUtils.assertStatusException(Status.INVALID_ARGUMENT, "TEST", () ->
client.echo(EchoRequest.getDefaultInstance()));
}
@Test
public void mapIOExceptionsReactive() throws Exception {
server = InProcessServerBuilder.forName("ErrorMappingInterceptorTest")
.directExecutor()
.addService(new ReactorEchoServiceErrorImpl(new IOException("test")))
.intercept(new ErrorMappingInterceptor())
.build()
.start();
final EchoServiceGrpc.EchoServiceBlockingStub client = EchoServiceGrpc.newBlockingStub(channel);
GrpcTestUtils.assertStatusException(Status.UNAVAILABLE, "UNAVAILABLE", () ->
client.echo(EchoRequest.getDefaultInstance()));
}
@Test
public void mapIOExceptionsSimple() throws Exception {
server = InProcessServerBuilder.forName("ErrorMappingInterceptorTest")
@@ -135,26 +96,6 @@ class ErrorMappingInterceptorTest {
client.echo(EchoRequest.getDefaultInstance()));
}
static class ReactorEchoServiceErrorImpl extends ReactorEchoServiceGrpc.EchoServiceImplBase {
private final Exception exception;
ReactorEchoServiceErrorImpl(final Exception exception) {
this.exception = exception;
}
@Override
public Mono<EchoResponse> echo(final EchoRequest echoRequest) {
return Mono.error(exception);
}
@Override
public Throwable onErrorMap(Throwable throwable) {
return new IllegalArgumentException(throwable);
}
}
static class SimpleEchoServiceErrorImpl extends SimpleEchoServiceGrpc.EchoServiceImplBase {
private final RuntimeException exception;

View File

@@ -6,7 +6,9 @@
package org.whispersystems.textsecuregcm.grpc;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyByte;
import static org.mockito.ArgumentMatchers.eq;
@@ -17,6 +19,7 @@ import static org.whispersystems.textsecuregcm.grpc.GrpcTestUtils.assertStatusEx
import com.google.protobuf.ByteString;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
@@ -28,6 +31,10 @@ import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.signal.chat.common.EcPreKey;
@@ -36,12 +43,12 @@ import org.signal.chat.common.KemSignedPreKey;
import org.signal.chat.common.ServiceIdentifier;
import org.signal.chat.keys.AccountPreKeyBundles;
import org.signal.chat.keys.CheckIdentityKeyRequest;
import org.signal.chat.keys.CheckIdentityKeyResponse;
import org.signal.chat.keys.DevicePreKeyBundle;
import org.signal.chat.keys.GetPreKeysAnonymousRequest;
import org.signal.chat.keys.GetPreKeysAnonymousResponse;
import org.signal.chat.keys.GetPreKeysRequest;
import org.signal.chat.keys.KeysAnonymousGrpc;
import org.signal.chat.keys.ReactorKeysAnonymousGrpc;
import org.signal.libsignal.protocol.IdentityKey;
import org.signal.libsignal.protocol.InvalidKeyException;
import org.signal.libsignal.protocol.ecc.ECKeyPair;
@@ -64,7 +71,6 @@ import org.whispersystems.textsecuregcm.util.TestClock;
import org.whispersystems.textsecuregcm.util.TestRandomUtil;
import org.whispersystems.textsecuregcm.util.UUIDUtil;
import org.whispersystems.textsecuregcm.util.Util;
import reactor.core.publisher.Flux;
class KeysAnonymousGrpcServiceTest extends SimpleBaseGrpcTest<KeysAnonymousGrpcService, KeysAnonymousGrpc.KeysAnonymousBlockingStub> {
@@ -335,8 +341,10 @@ class KeysAnonymousGrpcServiceTest extends SimpleBaseGrpcTest<KeysAnonymousGrpcS
}
@Test
void checkIdentityKeys() {
final ReactorKeysAnonymousGrpc.ReactorKeysAnonymousStub reactiveKeysAnonymousStub = ReactorKeysAnonymousGrpc.newReactorStub(SimpleBaseGrpcTest.GRPC_SERVER_EXTENSION_UNAUTHENTICATED.getChannel());
void checkIdentityKeys() throws InterruptedException {
final KeysAnonymousGrpc.KeysAnonymousStub keysAnonymousStub =
KeysAnonymousGrpc.newStub(SimpleBaseGrpcTest.GRPC_SERVER_EXTENSION_UNAUTHENTICATED.getChannel());
when(accountsManager.getByServiceIdentifierAsync(any()))
.thenReturn(CompletableFuture.completedFuture(Optional.empty()));
@@ -364,32 +372,58 @@ class KeysAnonymousGrpcServiceTest extends SimpleBaseGrpcTest<KeysAnonymousGrpcS
when(accountsManager.getByServiceIdentifierAsync(new PniServiceIdentifier(mismatchedPniFingerprintAccountIdentifier)))
.thenReturn(CompletableFuture.completedFuture(Optional.of(mismatchedPniFingerprintAccount)));
final Flux<CheckIdentityKeyRequest> requests = Flux.just(
buildCheckIdentityKeyRequest(org.signal.chat.common.IdentityType.IDENTITY_TYPE_ACI, mismatchedAciFingerprintAccountIdentifier,
new IdentityKey(ECKeyPair.generate().getPublicKey())),
buildCheckIdentityKeyRequest(org.signal.chat.common.IdentityType.IDENTITY_TYPE_ACI, matchingAciFingerprintAccountIdentifier,
matchingAciFingerprintAccountIdentityKey),
buildCheckIdentityKeyRequest(org.signal.chat.common.IdentityType.IDENTITY_TYPE_PNI, UUID.randomUUID(),
new IdentityKey(ECKeyPair.generate().getPublicKey())),
buildCheckIdentityKeyRequest(org.signal.chat.common.IdentityType.IDENTITY_TYPE_PNI, mismatchedPniFingerprintAccountIdentifier,
new IdentityKey(ECKeyPair.generate().getPublicKey()))
);
final Map<UUID, IdentityKey> expectedResponses = Map.of(
mismatchedAciFingerprintAccountIdentifier, mismatchedAciFingerprintAccountIdentityKey,
mismatchedPniFingerprintAccountIdentifier, mismatchedPniFingerpringAccountIdentityKey);
final Map<UUID, IdentityKey> responses = reactiveKeysAnonymousStub.checkIdentityKeys(requests)
.collectMap(response -> ServiceIdentifierUtil.fromGrpcServiceIdentifier(response.getTargetIdentifier()).uuid(),
response -> {
try {
return new IdentityKey(response.getIdentityKey().toByteArray());
} catch (InvalidKeyException e) {
throw new RuntimeException(e);
}
})
.block();
final Map<UUID, IdentityKey> responses = new ConcurrentHashMap<>();
final CountDownLatch completedLatch = new CountDownLatch(1);
final AtomicReference<Throwable> error = new AtomicReference<>();
final StreamObserver<CheckIdentityKeyRequest> requestStreamObserver =
keysAnonymousStub.checkIdentityKeys(new StreamObserver<>() {
@Override
public void onNext(final CheckIdentityKeyResponse checkIdentityKeyResponse) {
try {
responses.put(
ServiceIdentifierUtil.fromGrpcServiceIdentifier(checkIdentityKeyResponse.getTargetIdentifier()).uuid(),
new IdentityKey(checkIdentityKeyResponse.getIdentityKey().toByteArray()));
} catch (final InvalidKeyException e) {
throw new RuntimeException(e);
}
}
@Override
public void onError(final Throwable throwable) {
error.set(throwable);
completedLatch.countDown();
}
@Override
public void onCompleted() {
completedLatch.countDown();
}
});
requestStreamObserver.onNext(buildCheckIdentityKeyRequest(org.signal.chat.common.IdentityType.IDENTITY_TYPE_ACI, mismatchedAciFingerprintAccountIdentifier,
new IdentityKey(ECKeyPair.generate().getPublicKey())));
requestStreamObserver.onNext(buildCheckIdentityKeyRequest(org.signal.chat.common.IdentityType.IDENTITY_TYPE_ACI, matchingAciFingerprintAccountIdentifier,
matchingAciFingerprintAccountIdentityKey));
requestStreamObserver.onNext(buildCheckIdentityKeyRequest(org.signal.chat.common.IdentityType.IDENTITY_TYPE_PNI, UUID.randomUUID(),
new IdentityKey(ECKeyPair.generate().getPublicKey())));
requestStreamObserver.onNext(buildCheckIdentityKeyRequest(org.signal.chat.common.IdentityType.IDENTITY_TYPE_PNI, mismatchedPniFingerprintAccountIdentifier,
new IdentityKey(ECKeyPair.generate().getPublicKey())));
requestStreamObserver.onCompleted();
if (!completedLatch.await(5, TimeUnit.SECONDS)) {
fail("Timed out waiting for countdown latch");
}
assertNull(error.get());
assertEquals(expectedResponses, responses);
}

View File

@@ -33,16 +33,15 @@ import org.junit.jupiter.params.provider.ValueSource;
import org.signal.chat.require.Auth;
import org.signal.chat.rpc.Color;
import org.signal.chat.rpc.NestedMessage;
import org.signal.chat.rpc.ReactorAnonymousServiceGrpc;
import org.signal.chat.rpc.ReactorAuthServiceGrpc;
import org.signal.chat.rpc.ReactorValidationTestServiceGrpc;
import org.signal.chat.rpc.RecursiveMessage;
import org.signal.chat.rpc.SimpleAnonymousServiceGrpc;
import org.signal.chat.rpc.SimpleAuthServiceGrpc;
import org.signal.chat.rpc.SimpleValidationTestServiceGrpc;
import org.signal.chat.rpc.ValidationTestServiceGrpc;
import org.signal.chat.rpc.ValidationsRequest;
import org.signal.chat.rpc.ValidationsResponse;
import org.whispersystems.textsecuregcm.grpc.validators.ValidatorUtils;
import org.whispersystems.textsecuregcm.util.TestRandomUtil;
import reactor.core.publisher.Mono;
public class ValidatingInterceptorTest {
@@ -50,27 +49,27 @@ public class ValidatingInterceptorTest {
static final GrpcServerExtension GRPC_SERVER_EXTENSION = new GrpcServerExtension();
private static final class ValidationTestGrpcServiceImpl extends
ReactorValidationTestServiceGrpc.ValidationTestServiceImplBase {
SimpleValidationTestServiceGrpc.ValidationTestServiceImplBase {
@Override
public Mono<ValidationsResponse> validationsEndpoint(final ValidationsRequest request) {
return Mono.just(ValidationsResponse.newBuilder().build());
public ValidationsResponse validationsEndpoint(final ValidationsRequest request) {
return ValidationsResponse.getDefaultInstance();
}
}
private static final class AuthGrpcServiceImpl extends ReactorAuthServiceGrpc.AuthServiceImplBase {
private static final class AuthGrpcServiceImpl extends SimpleAuthServiceGrpc.AuthServiceImplBase {
@Override
public Mono<Empty> authenticatedMethod(final Empty request) {
return Mono.just(Empty.getDefaultInstance());
public Empty authenticatedMethod(final Empty request) {
return Empty.getDefaultInstance();
}
}
private static final class AnonymousGrpcServiceImpl extends ReactorAnonymousServiceGrpc.AnonymousServiceImplBase {
private static final class AnonymousGrpcServiceImpl extends SimpleAnonymousServiceGrpc.AnonymousServiceImplBase {
@Override
public Mono<Empty> anonymousMethod(final Empty request) {
return Mono.just(Empty.getDefaultInstance());
public Empty anonymousMethod(final Empty request) {
return Empty.getDefaultInstance();
}
}