diff --git a/pom.xml b/pom.xml index 87dce4cdb..72d524e3c 100644 --- a/pom.xml +++ b/pom.xml @@ -81,7 +81,6 @@ See https://protobuf.dev/support/cross-version-runtime-guarantee/. --> 4.29.4 0.15.4 - 1.2.4 2025.0.2 2.3.0 3.1.0 @@ -151,11 +150,6 @@ pom import - - com.salesforce.servicelibs - reactor-grpc-stub - ${reactive.grpc.version} - io.github.resilience4j resilience4j-bom @@ -483,14 +477,6 @@ io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} - - reactor-grpc - com.salesforce.servicelibs - reactor-grpc - ${reactive.grpc.version} - com.salesforce.reactorgrpc.ReactorGrpcGenerator - - simple org.signal diff --git a/service/pom.xml b/service/pom.xml index 757b2913b..c17638fd2 100644 --- a/service/pom.xml +++ b/service/pom.xml @@ -361,11 +361,6 @@ jackson-jaxrs-json-provider - - com.salesforce.servicelibs - reactor-grpc-stub - - org.foundationdb fdb-java diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/grpc/ErrorMappingInterceptorTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/grpc/ErrorMappingInterceptorTest.java index af614b931..774a590e1 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/grpc/ErrorMappingInterceptorTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/grpc/ErrorMappingInterceptorTest.java @@ -23,9 +23,7 @@ import org.junit.jupiter.api.Test; import org.signal.chat.rpc.EchoRequest; import org.signal.chat.rpc.EchoResponse; import org.signal.chat.rpc.EchoServiceGrpc; -import org.signal.chat.rpc.ReactorEchoServiceGrpc; import org.signal.chat.rpc.SimpleEchoServiceGrpc; -import reactor.core.publisher.Mono; class ErrorMappingInterceptorTest { @@ -34,7 +32,7 @@ class ErrorMappingInterceptorTest { @BeforeEach - void setUp() throws Exception { + void setUp() { channel = InProcessChannelBuilder.forName("ErrorMappingInterceptorTest") .directExecutor() .build(); @@ -70,43 +68,6 @@ class ErrorMappingInterceptorTest { client.echo(EchoRequest.getDefaultInstance())); } - @Test - public void includeDetailsReactiveGrpc() throws Exception { - final StatusRuntimeException e = StatusProto.toStatusRuntimeException(com.google.rpc.Status.newBuilder() - .setCode(Status.Code.INVALID_ARGUMENT.value()) - .addDetails(Any.pack(ErrorInfo.newBuilder() - .setDomain("test") - .setReason("TEST") - .build())) - .build()); - - server = InProcessServerBuilder.forName("ErrorMappingInterceptorTest") - .directExecutor() - .addService(new ReactorEchoServiceErrorImpl(e)) - .intercept(new ErrorMappingInterceptor()) - .build() - .start(); - - final EchoServiceGrpc.EchoServiceBlockingStub client = EchoServiceGrpc.newBlockingStub(channel); - GrpcTestUtils.assertStatusException(Status.INVALID_ARGUMENT, "TEST", () -> - client.echo(EchoRequest.getDefaultInstance())); - } - - - @Test - public void mapIOExceptionsReactive() throws Exception { - server = InProcessServerBuilder.forName("ErrorMappingInterceptorTest") - .directExecutor() - .addService(new ReactorEchoServiceErrorImpl(new IOException("test"))) - .intercept(new ErrorMappingInterceptor()) - .build() - .start(); - - final EchoServiceGrpc.EchoServiceBlockingStub client = EchoServiceGrpc.newBlockingStub(channel); - GrpcTestUtils.assertStatusException(Status.UNAVAILABLE, "UNAVAILABLE", () -> - client.echo(EchoRequest.getDefaultInstance())); - } - @Test public void mapIOExceptionsSimple() throws Exception { server = InProcessServerBuilder.forName("ErrorMappingInterceptorTest") @@ -135,26 +96,6 @@ class ErrorMappingInterceptorTest { client.echo(EchoRequest.getDefaultInstance())); } - - static class ReactorEchoServiceErrorImpl extends ReactorEchoServiceGrpc.EchoServiceImplBase { - - private final Exception exception; - - ReactorEchoServiceErrorImpl(final Exception exception) { - this.exception = exception; - } - - @Override - public Mono echo(final EchoRequest echoRequest) { - return Mono.error(exception); - } - - @Override - public Throwable onErrorMap(Throwable throwable) { - return new IllegalArgumentException(throwable); - } - } - static class SimpleEchoServiceErrorImpl extends SimpleEchoServiceGrpc.EchoServiceImplBase { private final RuntimeException exception; diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/grpc/KeysAnonymousGrpcServiceTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/grpc/KeysAnonymousGrpcServiceTest.java index 0de282635..adf70dfe3 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/grpc/KeysAnonymousGrpcServiceTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/grpc/KeysAnonymousGrpcServiceTest.java @@ -6,7 +6,9 @@ package org.whispersystems.textsecuregcm.grpc; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyByte; import static org.mockito.ArgumentMatchers.eq; @@ -17,6 +19,7 @@ import static org.whispersystems.textsecuregcm.grpc.GrpcTestUtils.assertStatusEx import com.google.protobuf.ByteString; import io.grpc.Status; +import io.grpc.stub.StreamObserver; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.time.Duration; @@ -28,6 +31,10 @@ import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.Test; import org.mockito.Mock; import org.signal.chat.common.EcPreKey; @@ -36,12 +43,12 @@ import org.signal.chat.common.KemSignedPreKey; import org.signal.chat.common.ServiceIdentifier; import org.signal.chat.keys.AccountPreKeyBundles; import org.signal.chat.keys.CheckIdentityKeyRequest; +import org.signal.chat.keys.CheckIdentityKeyResponse; import org.signal.chat.keys.DevicePreKeyBundle; import org.signal.chat.keys.GetPreKeysAnonymousRequest; import org.signal.chat.keys.GetPreKeysAnonymousResponse; import org.signal.chat.keys.GetPreKeysRequest; import org.signal.chat.keys.KeysAnonymousGrpc; -import org.signal.chat.keys.ReactorKeysAnonymousGrpc; import org.signal.libsignal.protocol.IdentityKey; import org.signal.libsignal.protocol.InvalidKeyException; import org.signal.libsignal.protocol.ecc.ECKeyPair; @@ -64,7 +71,6 @@ import org.whispersystems.textsecuregcm.util.TestClock; import org.whispersystems.textsecuregcm.util.TestRandomUtil; import org.whispersystems.textsecuregcm.util.UUIDUtil; import org.whispersystems.textsecuregcm.util.Util; -import reactor.core.publisher.Flux; class KeysAnonymousGrpcServiceTest extends SimpleBaseGrpcTest { @@ -335,8 +341,10 @@ class KeysAnonymousGrpcServiceTest extends SimpleBaseGrpcTest requests = Flux.just( - buildCheckIdentityKeyRequest(org.signal.chat.common.IdentityType.IDENTITY_TYPE_ACI, mismatchedAciFingerprintAccountIdentifier, - new IdentityKey(ECKeyPair.generate().getPublicKey())), - buildCheckIdentityKeyRequest(org.signal.chat.common.IdentityType.IDENTITY_TYPE_ACI, matchingAciFingerprintAccountIdentifier, - matchingAciFingerprintAccountIdentityKey), - buildCheckIdentityKeyRequest(org.signal.chat.common.IdentityType.IDENTITY_TYPE_PNI, UUID.randomUUID(), - new IdentityKey(ECKeyPair.generate().getPublicKey())), - buildCheckIdentityKeyRequest(org.signal.chat.common.IdentityType.IDENTITY_TYPE_PNI, mismatchedPniFingerprintAccountIdentifier, - new IdentityKey(ECKeyPair.generate().getPublicKey())) - ); - final Map expectedResponses = Map.of( mismatchedAciFingerprintAccountIdentifier, mismatchedAciFingerprintAccountIdentityKey, mismatchedPniFingerprintAccountIdentifier, mismatchedPniFingerpringAccountIdentityKey); - final Map responses = reactiveKeysAnonymousStub.checkIdentityKeys(requests) - .collectMap(response -> ServiceIdentifierUtil.fromGrpcServiceIdentifier(response.getTargetIdentifier()).uuid(), - response -> { - try { - return new IdentityKey(response.getIdentityKey().toByteArray()); - } catch (InvalidKeyException e) { - throw new RuntimeException(e); - } - }) - .block(); + final Map responses = new ConcurrentHashMap<>(); + final CountDownLatch completedLatch = new CountDownLatch(1); + final AtomicReference error = new AtomicReference<>(); + final StreamObserver requestStreamObserver = + keysAnonymousStub.checkIdentityKeys(new StreamObserver<>() { + @Override + public void onNext(final CheckIdentityKeyResponse checkIdentityKeyResponse) { + try { + responses.put( + ServiceIdentifierUtil.fromGrpcServiceIdentifier(checkIdentityKeyResponse.getTargetIdentifier()).uuid(), + new IdentityKey(checkIdentityKeyResponse.getIdentityKey().toByteArray())); + } catch (final InvalidKeyException e) { + throw new RuntimeException(e); + } + } + + @Override + public void onError(final Throwable throwable) { + error.set(throwable); + completedLatch.countDown(); + } + + @Override + public void onCompleted() { + completedLatch.countDown(); + } + }); + + requestStreamObserver.onNext(buildCheckIdentityKeyRequest(org.signal.chat.common.IdentityType.IDENTITY_TYPE_ACI, mismatchedAciFingerprintAccountIdentifier, + new IdentityKey(ECKeyPair.generate().getPublicKey()))); + + requestStreamObserver.onNext(buildCheckIdentityKeyRequest(org.signal.chat.common.IdentityType.IDENTITY_TYPE_ACI, matchingAciFingerprintAccountIdentifier, + matchingAciFingerprintAccountIdentityKey)); + + requestStreamObserver.onNext(buildCheckIdentityKeyRequest(org.signal.chat.common.IdentityType.IDENTITY_TYPE_PNI, UUID.randomUUID(), + new IdentityKey(ECKeyPair.generate().getPublicKey()))); + + requestStreamObserver.onNext(buildCheckIdentityKeyRequest(org.signal.chat.common.IdentityType.IDENTITY_TYPE_PNI, mismatchedPniFingerprintAccountIdentifier, + new IdentityKey(ECKeyPair.generate().getPublicKey()))); + + requestStreamObserver.onCompleted(); + + if (!completedLatch.await(5, TimeUnit.SECONDS)) { + fail("Timed out waiting for countdown latch"); + } + + assertNull(error.get()); assertEquals(expectedResponses, responses); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/grpc/ValidatingInterceptorTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/grpc/ValidatingInterceptorTest.java index 67b13bb78..024831d16 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/grpc/ValidatingInterceptorTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/grpc/ValidatingInterceptorTest.java @@ -33,16 +33,15 @@ import org.junit.jupiter.params.provider.ValueSource; import org.signal.chat.require.Auth; import org.signal.chat.rpc.Color; import org.signal.chat.rpc.NestedMessage; -import org.signal.chat.rpc.ReactorAnonymousServiceGrpc; -import org.signal.chat.rpc.ReactorAuthServiceGrpc; -import org.signal.chat.rpc.ReactorValidationTestServiceGrpc; import org.signal.chat.rpc.RecursiveMessage; +import org.signal.chat.rpc.SimpleAnonymousServiceGrpc; +import org.signal.chat.rpc.SimpleAuthServiceGrpc; +import org.signal.chat.rpc.SimpleValidationTestServiceGrpc; import org.signal.chat.rpc.ValidationTestServiceGrpc; import org.signal.chat.rpc.ValidationsRequest; import org.signal.chat.rpc.ValidationsResponse; import org.whispersystems.textsecuregcm.grpc.validators.ValidatorUtils; import org.whispersystems.textsecuregcm.util.TestRandomUtil; -import reactor.core.publisher.Mono; public class ValidatingInterceptorTest { @@ -50,27 +49,27 @@ public class ValidatingInterceptorTest { static final GrpcServerExtension GRPC_SERVER_EXTENSION = new GrpcServerExtension(); private static final class ValidationTestGrpcServiceImpl extends - ReactorValidationTestServiceGrpc.ValidationTestServiceImplBase { + SimpleValidationTestServiceGrpc.ValidationTestServiceImplBase { @Override - public Mono validationsEndpoint(final ValidationsRequest request) { - return Mono.just(ValidationsResponse.newBuilder().build()); + public ValidationsResponse validationsEndpoint(final ValidationsRequest request) { + return ValidationsResponse.getDefaultInstance(); } } - private static final class AuthGrpcServiceImpl extends ReactorAuthServiceGrpc.AuthServiceImplBase { + private static final class AuthGrpcServiceImpl extends SimpleAuthServiceGrpc.AuthServiceImplBase { @Override - public Mono authenticatedMethod(final Empty request) { - return Mono.just(Empty.getDefaultInstance()); + public Empty authenticatedMethod(final Empty request) { + return Empty.getDefaultInstance(); } } - private static final class AnonymousGrpcServiceImpl extends ReactorAnonymousServiceGrpc.AnonymousServiceImplBase { + private static final class AnonymousGrpcServiceImpl extends SimpleAnonymousServiceGrpc.AnonymousServiceImplBase { @Override - public Mono anonymousMethod(final Empty request) { - return Mono.just(Empty.getDefaultInstance()); + public Empty anonymousMethod(final Empty request) { + return Empty.getDefaultInstance(); } }