diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/backup/BackupAuthManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/backup/BackupAuthManager.java index e641a7560..254270480 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/backup/BackupAuthManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/backup/BackupAuthManager.java @@ -40,7 +40,6 @@ import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.storage.RedeemedReceiptsManager; -import org.whispersystems.textsecuregcm.util.Util; /** * Issues ZK backup auth credentials for authenticated accounts @@ -57,7 +56,6 @@ public class BackupAuthManager { private static final Logger logger = LoggerFactory.getLogger(BackupAuthManager.class); - final static Duration MAX_REDEMPTION_DURATION = Duration.ofDays(7); final static String BACKUP_MEDIA_EXPERIMENT_NAME = "backupMedia"; private final ExperimentEnrollmentManager experimentEnrollmentManager; @@ -94,14 +92,13 @@ public class BackupAuthManager { * message backups * @param mediaBackupCredentialRequest A request containing the blinded backup-id the client will use to upload * media backups - * @return A future that completes when the credentialRequest has been stored * @throws RateLimitExceededException If too many backup-ids have been committed */ - public CompletableFuture commitBackupId( + public void commitBackupId( final Account account, final Device device, final Optional messagesBackupCredentialRequest, - final Optional mediaBackupCredentialRequest) { + final Optional mediaBackupCredentialRequest) throws RateLimitExceededException { if (!device.isPrimary()) { throw Status.PERMISSION_DENIED.withDescription("Only primary device can set backup-id").asRuntimeException(); } @@ -133,33 +130,24 @@ public class BackupAuthManager { if (!requiresMessageRotation && !requiresMediaRotation) { // No need to update or enforce rate limits, this is the credential that the user has already // committed to. - return CompletableFuture.completedFuture(null); + return; } - CompletableFuture rateLimitFuture = CompletableFuture.completedFuture(null); - if (requiresMessageRotation) { - rateLimitFuture = rateLimitFuture.thenCombine( - rateLimiters.forDescriptor(RateLimiters.For.SET_BACKUP_ID).validateAsync(account.getUuid()), - (_, _) -> null); + rateLimiters.forDescriptor(RateLimiters.For.SET_BACKUP_ID).validate(account.getUuid()); } if (requiresMediaRotation && hasActiveVoucher(account)) { - rateLimitFuture = rateLimitFuture.thenCombine( - rateLimiters.forDescriptor(RateLimiters.For.SET_PAID_MEDIA_BACKUP_ID).validateAsync(account.getUuid()), - (_, _) -> null); + rateLimiters.forDescriptor(RateLimiters.For.SET_PAID_MEDIA_BACKUP_ID).validate(account.getUuid()); } - return rateLimitFuture.thenCompose(ignored -> this.accountsManager - .updateAsync(account, a -> - a.setBackupCredentialRequests(targetMessageCredentialRequest, targetMediaCredentialRequest)) - .thenRun(Util.NOOP)) - .toCompletableFuture(); + this.accountsManager.update(account, a -> + a.setBackupCredentialRequests(targetMessageCredentialRequest, targetMediaCredentialRequest)); } public record BackupIdRotationLimit(boolean hasPermitsRemaining, Duration nextPermitAvailable) {} - public CompletionStage checkBackupIdRotationLimit(final Account account) { + public BackupIdRotationLimit checkBackupIdRotationLimit(final Account account) { final RateLimiter messagesLimiter = rateLimiters.forDescriptor(RateLimiters.For.SET_BACKUP_ID); final RateLimiter mediaLimiter = rateLimiters.forDescriptor(RateLimiters.For.SET_PAID_MEDIA_BACKUP_ID); @@ -180,7 +168,7 @@ public class BackupAuthManager { isPaid ? mediaLimiter.config().permitRegenerationDuration() : Duration.ZERO)); return new BackupIdRotationLimit(false, timeToNextPermit); } - }); + }).toCompletableFuture().join(); } public record Credential(BackupAuthCredentialResponse credential, Instant redemptionTime) {} @@ -200,19 +188,20 @@ public class BackupAuthManager { * @param redemptionRange The time range to return credentials for * @return Credentials and the day on which they may be redeemed */ - public CompletableFuture> getBackupAuthCredentials( + public List getBackupAuthCredentials( final Account account, final BackupCredentialType credentialType, final RedemptionRange redemptionRange) { // If the account has an expired payment, clear it before continuing if (hasExpiredVoucher(account)) { - return accountsManager.updateAsync(account, a -> { + final Account updated = accountsManager.update(account, a -> { // Re-check in case we raced with an update if (hasExpiredVoucher(a)) { a.setBackupVoucher(null); } - }).thenCompose(updated -> getBackupAuthCredentials(updated, credentialType, redemptionRange)); + }); + return getBackupAuthCredentials(updated, credentialType, redemptionRange); } // fetch the blinded backup-id the account should have previously committed to @@ -224,7 +213,7 @@ public class BackupAuthManager { // create a credential for every day in the requested period final BackupAuthCredentialRequest credentialReq = new BackupAuthCredentialRequest(committedBytes); - return CompletableFuture.completedFuture(StreamSupport.stream(redemptionRange.spliterator(), false) + return StreamSupport.stream(redemptionRange.spliterator(), false) .map(redemptionTime -> { // Check if the account has a voucher that's good for a certain receiptLevel at redemption time, otherwise // use the default receipt level @@ -233,7 +222,7 @@ public class BackupAuthManager { credentialReq.issueCredential(redemptionTime, backupLevel, credentialType, serverSecretParams), redemptionTime); }) - .toList()); + .toList(); } catch (InvalidInputException e) { throw Status.INTERNAL .withDescription("Could not deserialize stored request credential") @@ -247,9 +236,8 @@ public class BackupAuthManager { * * @param account The account to enable backups on * @param receiptCredentialPresentation A ZK receipt presentation proving payment - * @return A future that completes successfully when the account has been updated */ - public CompletableFuture redeemReceipt( + public void redeemReceipt( final Account account, final ReceiptCredentialPresentation receiptCredentialPresentation) { try { @@ -279,16 +267,15 @@ public class BackupAuthManager { .asRuntimeException(); } - return redeemedReceiptsManager + boolean receiptAllowed = redeemedReceiptsManager .put(receiptSerial, receiptExpiration.getEpochSecond(), receiptLevel, account.getUuid()) - .thenCompose(receiptAllowed -> { - if (!receiptAllowed) { - throw Status.INVALID_ARGUMENT - .withDescription("receipt serial is already redeemed") - .asRuntimeException(); - } - return extendBackupVoucher(account, new Account.BackupVoucher(receiptLevel, receiptExpiration)); - }); + .join(); + if (!receiptAllowed) { + throw Status.INVALID_ARGUMENT + .withDescription("receipt serial is already redeemed") + .asRuntimeException(); + } + extendBackupVoucher(account, new Account.BackupVoucher(receiptLevel, receiptExpiration)); } /** @@ -296,11 +283,9 @@ public class BackupAuthManager { * * @param account The account to update * @param backupVoucher The backup voucher to apply to this account - * @return A future that completes once the account has been updated to have at least the level and expiration - * in the provided voucher. */ - public CompletableFuture extendBackupVoucher(final Account account, final Account.BackupVoucher backupVoucher) { - return accountsManager.updateAsync(account, a -> { + public void extendBackupVoucher(final Account account, final Account.BackupVoucher backupVoucher) { + accountsManager.update(account, a -> { // Receipt credential expirations must be day aligned. Make sure any manually set backupVoucher is also day // aligned final Account.BackupVoucher newPayment = new Account.BackupVoucher( @@ -308,7 +293,7 @@ public class BackupAuthManager { backupVoucher.expiration().truncatedTo(ChronoUnit.DAYS)); final Account.BackupVoucher existingPayment = a.getBackupVoucher(); a.setBackupVoucher(merge(existingPayment, newPayment)); - }).thenRun(Util.NOOP); + }); } private static Account.BackupVoucher merge(@Nullable final Account.BackupVoucher prev, diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/backup/BackupManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/backup/BackupManager.java index 19146de6e..d752f59c4 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/backup/BackupManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/backup/BackupManager.java @@ -8,6 +8,7 @@ package org.whispersystems.textsecuregcm.backup; import com.google.common.annotations.VisibleForTesting; import io.dropwizard.util.DataSize; import io.grpc.Status; +import io.grpc.StatusRuntimeException; import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Tag; @@ -42,12 +43,12 @@ import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentials; import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentialsGenerator; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicBackupConfiguration; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; +import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException; import org.whispersystems.textsecuregcm.limits.RateLimiters; import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil; import org.whispersystems.textsecuregcm.securevaluerecovery.SecureValueRecoveryClient; import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; -import org.whispersystems.textsecuregcm.util.AsyncTimerUtil; import org.whispersystems.textsecuregcm.util.ExceptionUtils; import org.whispersystems.textsecuregcm.util.Pair; import org.whispersystems.textsecuregcm.util.ua.UnrecognizedUserAgentException; @@ -128,7 +129,7 @@ public class BackupManager { * @param signature the signature of the presentation * @param publicKey the public key of a key-pair that the presentation must be signed with */ - public CompletableFuture setPublicKey( + public void setPublicKey( final BackupAuthCredentialPresentation presentation, final byte[] signature, final ECPublicKey publicKey) { @@ -139,8 +140,10 @@ public class BackupManager { final Pair credentialTypeAndBackupLevel = verifyPresentation(presentation).verifySignature(signature, publicKey); - return backupsDb.setPublicKey(presentation.getBackupId(), credentialTypeAndBackupLevel.second(), publicKey) - .exceptionally(ExceptionUtils.exceptionallyHandler(PublicKeyConflictException.class, ex -> { + ExceptionUtils.unwrapSupply( + PublicKeyConflictException.class, + () -> backupsDb.setPublicKey(presentation.getBackupId(), credentialTypeAndBackupLevel.second(), publicKey).join(), + _ -> { Metrics.counter(ZK_AUTHN_COUNTER_NAME, SUCCESS_TAG_NAME, String.valueOf(false), FAILURE_REASON_TAG_NAME, "public_key_conflict") @@ -148,7 +151,7 @@ public class BackupManager { throw Status.UNAUTHENTICATED .withDescription("public key does not match existing public key for the backup-id") .asRuntimeException(); - })); + }); } /** @@ -159,31 +162,27 @@ public class BackupManager { * @param backupUser an already ZK authenticated backup user * @return the upload form */ - public CompletableFuture createMessageBackupUploadDescriptor( + public BackupUploadDescriptor createMessageBackupUploadDescriptor( final AuthenticatedBackupUser backupUser) { checkBackupLevel(backupUser, BackupLevel.FREE); checkBackupCredentialType(backupUser, BackupCredentialType.MESSAGES); // this could race with concurrent updates, but the only effect would be last-writer-wins on the timestamp - return backupsDb - .addMessageBackup(backupUser) - .thenApply(_ -> - cdn3BackupCredentialGenerator.generateUpload(cdnMessageBackupName(backupUser))); + backupsDb.addMessageBackup(backupUser).join(); + return cdn3BackupCredentialGenerator.generateUpload(cdnMessageBackupName(backupUser)); } - public CompletableFuture createTemporaryAttachmentUploadDescriptor( - final AuthenticatedBackupUser backupUser) { + public BackupUploadDescriptor createTemporaryAttachmentUploadDescriptor(final AuthenticatedBackupUser backupUser) + throws RateLimitExceededException { checkBackupLevel(backupUser, BackupLevel.PAID); checkBackupCredentialType(backupUser, BackupCredentialType.MEDIA); - return rateLimiters.forDescriptor(RateLimiters.For.BACKUP_ATTACHMENT) - .validateAsync(rateLimitKey(backupUser)).thenApply(ignored -> { - final byte[] bytes = new byte[15]; - secureRandom.nextBytes(bytes); - final String attachmentKey = Base64.getUrlEncoder().encodeToString(bytes); - final AttachmentGenerator.Descriptor descriptor = tusAttachmentGenerator.generateAttachment(attachmentKey); - return new BackupUploadDescriptor(3, attachmentKey, descriptor.headers(), descriptor.signedUploadLocation()); - }).toCompletableFuture(); + rateLimiters.forDescriptor(RateLimiters.For.BACKUP_ATTACHMENT).validate(rateLimitKey(backupUser)); + final byte[] bytes = new byte[15]; + secureRandom.nextBytes(bytes); + final String attachmentKey = Base64.getUrlEncoder().encodeToString(bytes); + final AttachmentGenerator.Descriptor descriptor = tusAttachmentGenerator.generateAttachment(attachmentKey); + return new BackupUploadDescriptor(3, attachmentKey, descriptor.headers(), descriptor.signedUploadLocation()); } /** @@ -191,36 +190,35 @@ public class BackupManager { * * @param backupUser an already ZK authenticated backup user */ - public CompletableFuture ttlRefresh(final AuthenticatedBackupUser backupUser) { + public void ttlRefresh(final AuthenticatedBackupUser backupUser) { checkBackupLevel(backupUser, BackupLevel.FREE); // update message backup TTL - return backupsDb.ttlRefresh(backupUser).thenAccept(storedBackupAttributes -> { - if (backupUser.credentialType() == BackupCredentialType.MEDIA) { - final long maxTotalMediaSize = - dynamicConfigurationManager.getConfiguration().getBackupConfiguration().maxTotalMediaSize(); + final StoredBackupAttributes storedBackupAttributes = backupsDb.ttlRefresh(backupUser).join(); + if (backupUser.credentialType() == BackupCredentialType.MEDIA) { + final long maxTotalMediaSize = + dynamicConfigurationManager.getConfiguration().getBackupConfiguration().maxTotalMediaSize(); - // Report that the backup is out of quota if it cannot store a max size media object - final boolean quotaExhausted = storedBackupAttributes.bytesUsed() >= - (maxTotalMediaSize - BackupManager.MAX_MEDIA_OBJECT_SIZE); + // Report that the backup is out of quota if it cannot store a max size media object + final boolean quotaExhausted = storedBackupAttributes.bytesUsed() >= + (maxTotalMediaSize - BackupManager.MAX_MEDIA_OBJECT_SIZE); - final Tags tags = Tags.of( - UserAgentTagUtil.getPlatformTag(backupUser.userAgent()), - Tag.of("type", backupUser.credentialType().name()), - Tag.of("tier", backupUser.backupLevel().name()), - Tag.of("quotaExhausted", String.valueOf(quotaExhausted))); + final Tags tags = Tags.of( + UserAgentTagUtil.getPlatformTag(backupUser.userAgent()), + Tag.of("type", backupUser.credentialType().name()), + Tag.of("tier", backupUser.backupLevel().name()), + Tag.of("quotaExhausted", String.valueOf(quotaExhausted))); - DistributionSummary.builder(NUM_OBJECTS_SUMMARY_NAME) - .tags(tags) - .publishPercentileHistogram() - .register(Metrics.globalRegistry) - .record(storedBackupAttributes.numObjects()); - DistributionSummary.builder(BYTES_USED_SUMMARY_NAME) - .tags(tags) - .publishPercentileHistogram() - .register(Metrics.globalRegistry) - .record(storedBackupAttributes.bytesUsed()); - } - }); + DistributionSummary.builder(NUM_OBJECTS_SUMMARY_NAME) + .tags(tags) + .publishPercentileHistogram() + .register(Metrics.globalRegistry) + .record(storedBackupAttributes.numObjects()); + DistributionSummary.builder(BYTES_USED_SUMMARY_NAME) + .tags(tags) + .publishPercentileHistogram() + .register(Metrics.globalRegistry) + .record(storedBackupAttributes.bytesUsed()); + } } public record BackupInfo(int cdn, String backupSubdir, String mediaSubdir, String messageBackupKey, @@ -232,15 +230,15 @@ public class BackupManager { * @param backupUser an already ZK authenticated backup user * @return Information about the existing backup */ - public CompletableFuture backupInfo(final AuthenticatedBackupUser backupUser) { + public BackupInfo backupInfo(final AuthenticatedBackupUser backupUser) { checkBackupLevel(backupUser, BackupLevel.FREE); - return backupsDb.describeBackup(backupUser) - .thenApply(backupDescription -> new BackupInfo( - backupDescription.cdn(), - backupUser.backupDir(), - backupUser.mediaDir(), - MESSAGE_BACKUP_NAME, - backupDescription.mediaUsedSpace())); + final BackupsDb.BackupDescription backupDescription = backupsDb.describeBackup(backupUser).join(); + return new BackupInfo( + backupDescription.cdn(), + backupUser.backupDir(), + backupUser.mediaDir(), + MESSAGE_BACKUP_NAME, + backupDescription.mediaUsedSpace()); } /** @@ -461,45 +459,47 @@ public class BackupManager { * @param limit The maximum number of list results to return * @return A {@link ListMediaResult} */ - public CompletionStage list( + public ListMediaResult list( final AuthenticatedBackupUser backupUser, final Optional cursor, final int limit) { checkBackupLevel(backupUser, BackupLevel.FREE); - return remoteStorageManager.list(cdnMediaDirectory(backupUser), cursor, limit) - .thenApply(result -> - new ListMediaResult( - result - .objects() - .stream() - .map(entry -> new StorageDescriptorWithLength( - remoteStorageManager.cdnNumber(), - decodeMediaIdFromCdn(entry.key()), - entry.length() - )) - .toList(), - result.cursor() - )); + final RemoteStorageManager.ListResult result = + remoteStorageManager.list(cdnMediaDirectory(backupUser), cursor, limit).toCompletableFuture().join(); + return new ListMediaResult(result + .objects() + .stream() + .map(entry -> new StorageDescriptorWithLength( + remoteStorageManager.cdnNumber(), + decodeMediaIdFromCdn(entry.key()), + entry.length() + )) + .toList(), + result.cursor()); } - public CompletableFuture deleteEntireBackup(final AuthenticatedBackupUser backupUser) { + public void deleteEntireBackup(final AuthenticatedBackupUser backupUser) { checkBackupLevel(backupUser, BackupLevel.FREE); final int deletionConcurrency = dynamicConfigurationManager.getConfiguration().getBackupConfiguration().deletionConcurrency(); // Clients only include SVRB data with their messages backup-id - final CompletableFuture svrbRemoval = switch(backupUser.credentialType()) { - case BackupCredentialType.MESSAGES -> secureValueRecoveryBClient.removeData(svrbIdentifier(backupUser)); - case BackupCredentialType.MEDIA -> CompletableFuture.completedFuture(null); - }; - return svrbRemoval.thenCompose(_ -> backupsDb - // Try to swap out the backupDir for the user - .scheduleBackupDeletion(backupUser) + if (backupUser.credentialType() == BackupCredentialType.MESSAGES) { + secureValueRecoveryBClient.removeData(svrbIdentifier(backupUser)).join(); + } + try { + // Try to swap out the backupDir for the user + backupsDb.scheduleBackupDeletion(backupUser).join(); + } catch (Exception e) { + final Throwable unwrapped = ExceptionUtils.unwrap(e); + if (unwrapped instanceof BackupsDb.PendingDeletionException) { // If there was already a pending swap, try to delete the cdn objects directly - .exceptionallyCompose(ExceptionUtils.exceptionallyHandler(BackupsDb.PendingDeletionException.class, e -> - AsyncTimerUtil.record(SYNCHRONOUS_DELETE_TIMER, () -> - deletePrefix(backupUser.backupDir(), deletionConcurrency))))); + SYNCHRONOUS_DELETE_TIMER.record(() -> deletePrefix(backupUser.backupDir(), deletionConcurrency).join()); + } else { + throw e; + } + } } @@ -615,11 +615,34 @@ public class BackupManager { * @param signature An XEd25519 signature of the presentation bytes * @return On authentication success, the authenticated backup-id and backup-tier encoded in the presentation */ - public CompletableFuture authenticateBackupUser( + public AuthenticatedBackupUser authenticateBackupUser( + final BackupAuthCredentialPresentation presentation, + final byte[] signature, + final String userAgentString) { + return ExceptionUtils.unwrapSupply( + StatusRuntimeException.class, + () -> authenticateBackupUserAsync(presentation, signature, userAgentString).join()); + } + + /** + * Authenticate the ZK anonymous backup credential's presentation + *

+ * This validates: + *

  • The presentation was for a credential issued by the server
  • + *
  • The credential is in its redemption window
  • + *
  • The backup-id matches a previously committed blinded backup-id and server issued receipt level
  • + *
  • The signature of the credential matches an existing publicKey associated with this backup-id
  • + * + * @param presentation A {@link BackupAuthCredentialPresentation} + * @param signature An XEd25519 signature of the presentation bytes + * @return A future that completes with the authenticated backup-id and backup-tier encoded in the presentation + */ + public CompletableFuture authenticateBackupUserAsync( final BackupAuthCredentialPresentation presentation, final byte[] signature, final String userAgentString) { final PresentationSignatureVerifier signatureVerifier = verifyPresentation(presentation); + return backupsDb .retrieveAuthenticationData(presentation.getBackupId()) .thenApply(optionalAuthenticationData -> { @@ -701,7 +724,6 @@ public class BackupManager { * List and delete all files associated with a prefix * * @param prefixToDelete The prefix to expire. - * @return A stage that completes when all objects with the given prefix have been deleted */ private CompletableFuture deletePrefix(final String prefixToDelete, int concurrentDeletes) { if (prefixToDelete.length() != BackupsDb.BACKUP_DIRECTORY_PATH_LENGTH diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/ArchiveController.java b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/ArchiveController.java index bd34ee9dd..3476ae54e 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/ArchiveController.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/ArchiveController.java @@ -47,28 +47,28 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; import java.time.Clock; import java.time.Instant; -import java.util.Arrays; import java.util.Base64; +import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.glassfish.jersey.server.ManagedAsync; import org.signal.libsignal.protocol.ecc.ECPublicKey; import org.signal.libsignal.zkgroup.InvalidInputException; import org.signal.libsignal.zkgroup.backups.BackupAuthCredentialPresentation; import org.signal.libsignal.zkgroup.backups.BackupAuthCredentialRequest; import org.signal.libsignal.zkgroup.backups.BackupCredentialType; import org.signal.libsignal.zkgroup.receipts.ReceiptCredentialPresentation; +import org.whispersystems.textsecuregcm.auth.AuthenticatedBackupUser; import org.whispersystems.textsecuregcm.auth.AuthenticatedDevice; import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentials; import org.whispersystems.textsecuregcm.auth.RedemptionRange; import org.whispersystems.textsecuregcm.backup.BackupAuthManager; import org.whispersystems.textsecuregcm.backup.BackupManager; +import org.whispersystems.textsecuregcm.backup.BackupUploadDescriptor; import org.whispersystems.textsecuregcm.backup.CopyParameters; import org.whispersystems.textsecuregcm.backup.CopyResult; import org.whispersystems.textsecuregcm.backup.MediaEncryptionParameters; @@ -83,8 +83,6 @@ import org.whispersystems.textsecuregcm.util.ByteArrayAdapter; import org.whispersystems.textsecuregcm.util.ByteArrayBase64UrlAdapter; import org.whispersystems.textsecuregcm.util.ECPublicKeyAdapter; import org.whispersystems.textsecuregcm.util.ExactlySize; -import org.whispersystems.textsecuregcm.util.Util; -import reactor.core.publisher.Mono; @Path("/v1/archives") @io.swagger.v3.oas.annotations.tags.Tag(name = "Archive") @@ -149,24 +147,20 @@ public class ArchiveController { @ApiResponse(responseCode = "400", description = "The provided backup auth credential request was invalid") @ApiResponse(responseCode = "403", description = "The device did not have permission to set the backup-id. Only the primary device can set the backup-id for an account") @ApiResponse(responseCode = "429", description = "Rate limited. Too many attempts to change the backup-id have been made") - public CompletionStage setBackupId( + @ManagedAsync + public void setBackupId( @Auth final AuthenticatedDevice authenticatedDevice, @Valid @NotNull final SetBackupIdRequest setBackupIdRequest) throws RateLimitExceededException { - return accountsManager.getByAccountIdentifierAsync(authenticatedDevice.accountIdentifier()) - .thenCompose(maybeAccount -> { - final Account account = maybeAccount - .orElseThrow(() -> new WebApplicationException(Response.Status.UNAUTHORIZED)); + final Account account = accountsManager.getByAccountIdentifier(authenticatedDevice.accountIdentifier()) + .orElseThrow(() -> new WebApplicationException(Response.Status.UNAUTHORIZED)); + final Device device = account.getDevice(authenticatedDevice.deviceId()) + .orElseThrow(() -> new WebApplicationException(Response.Status.UNAUTHORIZED)); - final Device device = account.getDevice(authenticatedDevice.deviceId()) - .orElseThrow(() -> new WebApplicationException(Response.Status.UNAUTHORIZED)); - - return backupAuthManager - .commitBackupId(account, device, - Optional.ofNullable(setBackupIdRequest.messagesBackupAuthCredentialRequest), - Optional.ofNullable(setBackupIdRequest.mediaBackupAuthCredentialRequest)) - .thenApply(Util.ASYNC_EMPTY_RESPONSE); - }); + backupAuthManager + .commitBackupId(account, device, + Optional.ofNullable(setBackupIdRequest.messagesBackupAuthCredentialRequest), + Optional.ofNullable(setBackupIdRequest.mediaBackupAuthCredentialRequest)); } @@ -186,15 +180,13 @@ public class ArchiveController { """) @ApiResponse(responseCode = "200", description = "Successfully retrieved backup-id rotation limits", useReturnTypeSchema = true) @ApiResponse(responseCode = "403", description = "Invalid account authentication") - public CompletionStage checkLimits(@Auth final AuthenticatedDevice authenticatedDevice) { - return accountsManager.getByAccountIdentifierAsync(authenticatedDevice.accountIdentifier()) - .thenCompose(maybeAccount -> { - final Account account = maybeAccount - .orElseThrow(() -> new WebApplicationException(Response.Status.UNAUTHORIZED)); + @ManagedAsync + public BackupIdLimitResponse checkLimits(@Auth final AuthenticatedDevice authenticatedDevice) { + final Account account = accountsManager.getByAccountIdentifier(authenticatedDevice.accountIdentifier()) + .orElseThrow(() -> new WebApplicationException(Response.Status.UNAUTHORIZED)); - return backupAuthManager.checkBackupIdRotationLimit(account).thenApply(limit -> - new BackupIdLimitResponse(limit.hasPermitsRemaining(), limit.nextPermitAvailable().getSeconds())); - }); + final BackupAuthManager.BackupIdRotationLimit limit = backupAuthManager.checkBackupIdRotationLimit(account); + return new BackupIdLimitResponse(limit.hasPermitsRemaining(), limit.nextPermitAvailable().getSeconds()); } public record RedeemBackupReceiptRequest( @@ -237,18 +229,15 @@ public class ArchiveController { @ApiResponse(responseCode = "400", description = "The provided presentation or receipt was invalid") @ApiResponse(responseCode = "409", description = "The target account does not have a backup-id commitment") @ApiResponse(responseCode = "429", description = "Rate limited.") - public CompletionStage redeemReceipt( + @ManagedAsync + public void redeemReceipt( @Auth final AuthenticatedDevice authenticatedDevice, @Valid @NotNull final RedeemBackupReceiptRequest redeemBackupReceiptRequest) { - return accountsManager.getByAccountIdentifierAsync(authenticatedDevice.accountIdentifier()) - .thenCompose(maybeAccount -> { - final Account account = maybeAccount + final Account account = accountsManager.getByAccountIdentifier(authenticatedDevice.accountIdentifier()) .orElseThrow(() -> new WebApplicationException(Response.Status.UNAUTHORIZED)); - return backupAuthManager.redeemReceipt(account, redeemBackupReceiptRequest.receiptCredentialPresentation()) - .thenApply(Util.ASYNC_EMPTY_RESPONSE); - }); + backupAuthManager.redeemReceipt(account, redeemBackupReceiptRequest.receiptCredentialPresentation()); } public record BackupAuthCredentialsResponse( @@ -306,14 +295,15 @@ public class ArchiveController { @ApiResponse(responseCode = "400", description = "The start/end did not meet alignment/duration requirements") @ApiResponse(responseCode = "404", description = "Could not find an existing blinded backup id") @ApiResponse(responseCode = "429", description = "Rate limited.") - public CompletionStage getBackupZKCredentials( + @ManagedAsync + public BackupAuthCredentialsResponse getBackupZKCredentials( @Auth AuthenticatedDevice authenticatedDevice, @HeaderParam(HttpHeaders.USER_AGENT) final String userAgent, @NotNull @QueryParam("redemptionStartSeconds") Long startSeconds, @NotNull @QueryParam("redemptionEndSeconds") Long endSeconds) { final Map> credentialsByType = - new ConcurrentHashMap<>(); + new HashMap<>(); final RedemptionRange redemptionRange; try { @@ -322,33 +312,26 @@ public class ArchiveController { throw new BadRequestException(e.getMessage()); } - return accountsManager.getByAccountIdentifierAsync(authenticatedDevice.accountIdentifier()) - .thenCompose(maybeAccount -> { - final Account account = maybeAccount - .orElseThrow(() -> new WebApplicationException(Response.Status.UNAUTHORIZED)); + final Account account = accountsManager.getByAccountIdentifier(authenticatedDevice.accountIdentifier()) + .orElseThrow(() -> new WebApplicationException(Response.Status.UNAUTHORIZED)); - return CompletableFuture.allOf(Arrays.stream(BackupCredentialType.values()) - .map(credentialType -> this.backupAuthManager.getBackupAuthCredentials( - account, - credentialType, - redemptionRange) - .thenAccept(credentials -> { - backupMetrics.updateGetCredentialCounter( - UserAgentTagUtil.getPlatformTag(userAgent), - credentialType, - credentials.size()); - credentialsByType.put(credentialType, credentials.stream() - .map(credential -> new BackupAuthCredentialsResponse.BackupAuthCredential( - credential.credential().serialize(), - credential.redemptionTime().getEpochSecond())) - .toList()); - })) - .toArray(CompletableFuture[]::new)) - .thenApply(ignored -> new BackupAuthCredentialsResponse(credentialsByType.entrySet().stream() - .collect(Collectors.toMap( - e -> BackupAuthCredentialsResponse.CredentialType.fromLibsignalType(e.getKey()), - Map.Entry::getValue)))); - }); + for (BackupCredentialType credentialType : BackupCredentialType.values()) { + final List credentials = + backupAuthManager.getBackupAuthCredentials(account, credentialType, redemptionRange); + backupMetrics.updateGetCredentialCounter( + UserAgentTagUtil.getPlatformTag(userAgent), + credentialType, + credentials.size()); + credentialsByType.put(credentialType, credentials.stream() + .map(credential -> new BackupAuthCredentialsResponse.BackupAuthCredential( + credential.credential().serialize(), + credential.redemptionTime().getEpochSecond())) + .toList()); + } + return new BackupAuthCredentialsResponse(credentialsByType.entrySet().stream() + .collect(Collectors.toMap( + e -> BackupAuthCredentialsResponse.CredentialType.fromLibsignalType(e.getKey()), + Map.Entry::getValue))); } @@ -410,7 +393,8 @@ public class ArchiveController { @ApiResponse(responseCode = "200", content = @Content(schema = @Schema(implementation = ReadAuthResponse.class))) @ApiResponse(responseCode = "429", description = "Rate limited.") @ApiResponseZkAuth - public CompletionStage readAuth( + @ManagedAsync + public ReadAuthResponse readAuth( @Auth final Optional account, @HeaderParam(HttpHeaders.USER_AGENT) final String userAgent, @@ -426,9 +410,9 @@ public class ArchiveController { if (account.isPresent()) { throw new BadRequestException("must not use authenticated connection for anonymous operations"); } - return backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent) - .thenApply(user -> backupManager.generateReadAuth(user, cdn)) - .thenApply(ReadAuthResponse::new); + final AuthenticatedBackupUser backupUser = + backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent); + return new ReadAuthResponse(backupManager.generateReadAuth(backupUser, cdn)); } @GET @@ -441,7 +425,8 @@ public class ArchiveController { """) @ApiResponse(responseCode = "200", description = "`JSON` with generated credentials.", useReturnTypeSchema = true) @ApiResponseZkAuth - public CompletionStage svrbAuth( + @ManagedAsync + public ExternalServiceCredentials svrbAuth( @Auth final Optional account, @HeaderParam(HttpHeaders.USER_AGENT) final String userAgent, @@ -455,9 +440,9 @@ public class ArchiveController { if (account.isPresent()) { throw new BadRequestException("must not use authenticated connection for anonymous operations"); } - return backupManager - .authenticateBackupUser(presentation.presentation, signature.signature, userAgent) - .thenApply(backupManager::generateSvrbAuth); + final AuthenticatedBackupUser backupUser = + backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent); + return backupManager.generateSvrbAuth(backupUser); } public record BackupInfoResponse( @@ -491,7 +476,8 @@ public class ArchiveController { @ApiResponse(responseCode = "404", description = "No existing backups found") @ApiResponse(responseCode = "429", description = "Rate limited.") @ApiResponseZkAuth - public CompletionStage backupInfo( + @ManagedAsync + public BackupInfoResponse backupInfo( @Auth final Optional account, @HeaderParam(HttpHeaders.USER_AGENT) final String userAgent, @@ -506,14 +492,15 @@ public class ArchiveController { throw new BadRequestException("must not use authenticated connection for anonymous operations"); } - return backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent) - .thenCompose(backupManager::backupInfo) - .thenApply(backupInfo -> new BackupInfoResponse( + final AuthenticatedBackupUser backupUser = + backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent); + final BackupManager.BackupInfo backupInfo = backupManager.backupInfo(backupUser); + return new BackupInfoResponse( backupInfo.cdn(), backupInfo.backupSubdir(), backupInfo.mediaSubdir(), backupInfo.messageBackupKey(), - backupInfo.mediaUsedSpace().orElse(0L))); + backupInfo.mediaUsedSpace().orElse(0L)); } public record SetPublicKeyRequest( @@ -531,13 +518,14 @@ public class ArchiveController { summary = "Set public key", description = """ Permanently set the public key of an ED25519 key-pair for the backup-id. All requests that provide a anonymous - BackupAuthCredentialPresentation (including this one!) must also sign the presentation with the private key + BackupAuthCredentialPresentation (including this one!) must also sign the presentation with the private key corresponding to the provided public key. """) @ApiResponseZkAuth @ApiResponse(responseCode = "204", description = "The public key was set") @ApiResponse(responseCode = "429", description = "Rate limited.") - public CompletionStage setPublicKey( + @ManagedAsync + public void setPublicKey( @Auth final Optional account, @Parameter(description = BackupAuthCredentialPresentationHeader.DESCRIPTION, schema = @Schema(implementation = String.class)) @@ -552,9 +540,7 @@ public class ArchiveController { if (account.isPresent()) { throw new BadRequestException("must not use authenticated connection for anonymous operations"); } - return backupManager - .setPublicKey(presentation.presentation, signature.signature, setPublicKeyRequest.backupIdPublicKey) - .thenApply(Util.ASYNC_EMPTY_RESPONSE); + backupManager.setPublicKey(presentation.presentation, signature.signature, setPublicKeyRequest.backupIdPublicKey); } @@ -578,7 +564,8 @@ public class ArchiveController { @ApiResponse(responseCode = "429", description = "Rate limited.") @ApiResponse(responseCode = "413", description = "The provided uploadLength is larger than the maximum supported upload size. The maximum upload size is subject to change.") @ApiResponseZkAuth - public CompletionStage backup( + @ManagedAsync + public UploadDescriptorResponse backup( @Auth final Optional account, @HeaderParam(HttpHeaders.USER_AGENT) final String userAgent, @@ -595,22 +582,25 @@ public class ArchiveController { if (account.isPresent()) { throw new BadRequestException("must not use authenticated connection for anonymous operations"); } - return backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent) - .thenCompose(backupUser -> { - final boolean oversize = uploadLength - .map(length -> length > BackupManager.MAX_MESSAGE_BACKUP_OBJECT_SIZE) - .orElse(false); - backupMetrics.updateMessageBackupSizeDistribution(backupUser, oversize, uploadLength); - if (oversize) { - throw new ClientErrorException("exceeded maximum uploadLength", Response.Status.REQUEST_ENTITY_TOO_LARGE); - } - return backupManager.createMessageBackupUploadDescriptor(backupUser); - }) - .thenApply(result -> new UploadDescriptorResponse( - result.cdn(), - result.key(), - result.headers(), - result.signedUploadLocation())); + + final AuthenticatedBackupUser backupUser = + backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent); + + final boolean oversize = uploadLength + .map(length -> length > BackupManager.MAX_MESSAGE_BACKUP_OBJECT_SIZE) + .orElse(false); + + backupMetrics.updateMessageBackupSizeDistribution(backupUser, oversize, uploadLength); + if (oversize) { + throw new ClientErrorException("exceeded maximum uploadLength", Response.Status.REQUEST_ENTITY_TOO_LARGE); + } + final BackupUploadDescriptor uploadDescriptor = + backupManager.createMessageBackupUploadDescriptor(backupUser); + return new UploadDescriptorResponse( + uploadDescriptor.cdn(), + uploadDescriptor.key(), + uploadDescriptor.headers(), + uploadDescriptor.signedUploadLocation()); } @GET @@ -627,7 +617,8 @@ public class ArchiveController { @ApiResponse(responseCode = "200", content = @Content(schema = @Schema(implementation = UploadDescriptorResponse.class))) @ApiResponse(responseCode = "429", description = "Rate limited.") @ApiResponseZkAuth - public CompletionStage uploadTemporaryAttachment( + @ManagedAsync + public UploadDescriptorResponse uploadTemporaryAttachment( @Auth final Optional account, @HeaderParam(HttpHeaders.USER_AGENT) final String userAgent, @@ -638,17 +629,20 @@ public class ArchiveController { @Parameter(description = BackupAuthCredentialPresentationSignature.DESCRIPTION, schema = @Schema(implementation = String.class)) @NotNull - @HeaderParam(X_SIGNAL_ZK_AUTH_SIGNATURE) final BackupAuthCredentialPresentationSignature signature) { + @HeaderParam(X_SIGNAL_ZK_AUTH_SIGNATURE) final BackupAuthCredentialPresentationSignature signature) + throws RateLimitExceededException { if (account.isPresent()) { throw new BadRequestException("must not use authenticated connection for anonymous operations"); } - return backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent) - .thenCompose(backupManager::createTemporaryAttachmentUploadDescriptor) - .thenApply(result -> new UploadDescriptorResponse( - result.cdn(), - result.key(), - result.headers(), - result.signedUploadLocation())); + final AuthenticatedBackupUser backupUser = + backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent); + final BackupUploadDescriptor uploadDescriptor = + backupManager.createTemporaryAttachmentUploadDescriptor(backupUser); + return new UploadDescriptorResponse( + uploadDescriptor.cdn(), + uploadDescriptor.key(), + uploadDescriptor.headers(), + uploadDescriptor.signedUploadLocation()); } public record CopyMediaRequest( @@ -715,7 +709,8 @@ public class ArchiveController { @ApiResponse(responseCode = "410", description = "The source object was not found.") @ApiResponse(responseCode = "429", description = "Rate limited.") @ApiResponseZkAuth - public CompletionStage copyMedia( + @ManagedAsync + public CopyMediaResponse copyMedia( @Auth final Optional account, @HeaderParam(HttpHeaders.USER_AGENT) final String userAgent, @@ -733,19 +728,20 @@ public class ArchiveController { throw new BadRequestException("must not use authenticated connection for anonymous operations"); } - return Mono - .fromFuture(backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent)) - .flatMap(backupUser -> backupManager.copyToBackup(backupUser, List.of(copyMediaRequest.toCopyParameters())) - .next() - .doOnNext(result -> backupMetrics.updateCopyCounter(result, UserAgentTagUtil.getPlatformTag(userAgent))) - .map(copyResult -> switch (copyResult.outcome()) { - case SUCCESS -> new CopyMediaResponse(copyResult.cdn()); - case SOURCE_WRONG_LENGTH -> throw new BadRequestException("Invalid length"); - case SOURCE_NOT_FOUND -> throw new ClientErrorException("Source object not found", Response.Status.GONE); - case OUT_OF_QUOTA -> - throw new ClientErrorException("Media quota exhausted", Response.Status.REQUEST_ENTITY_TOO_LARGE); - })) - .toFuture(); + final AuthenticatedBackupUser backupUser = + backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent); + final CopyResult copyResult = + backupManager.copyToBackup(backupUser, List.of(copyMediaRequest.toCopyParameters())).next() + .blockOptional() + .orElseThrow(() -> new IllegalStateException("Non empty copy request must return result")); + backupMetrics.updateCopyCounter(copyResult, UserAgentTagUtil.getPlatformTag(userAgent)); + return switch (copyResult.outcome()) { + case SUCCESS -> new CopyMediaResponse(copyResult.cdn()); + case SOURCE_WRONG_LENGTH -> throw new BadRequestException("Invalid length"); + case SOURCE_NOT_FOUND -> throw new ClientErrorException("Source object not found", Response.Status.GONE); + case OUT_OF_QUOTA -> + throw new ClientErrorException("Media quota exhausted", Response.Status.REQUEST_ENTITY_TOO_LARGE); + }; } public record CopyMediaBatchRequest( @@ -808,13 +804,14 @@ public class ArchiveController { be provided as a separate entry in the response. """) @ApiResponse(responseCode = "207", description = """ - The request was processed and each operation's outcome must be inspected individually. This does NOT necessarily + The request was processed and each operation's outcome must be inspected individually. This does NOT necessarily indicate the operation was a success. """, content = @Content(schema = @Schema(implementation = CopyMediaBatchResponse.class))) @ApiResponse(responseCode = "413", description = "All media capacity has been consumed. Free some space to continue.") @ApiResponse(responseCode = "429", description = "Rate limited.") @ApiResponseZkAuth - public CompletionStage copyMedia( + @ManagedAsync + public Response copyMedia( @Auth final Optional account, @HeaderParam(HttpHeaders.USER_AGENT) final String userAgent, @@ -833,13 +830,13 @@ public class ArchiveController { throw new BadRequestException("must not use authenticated connection for anonymous operations"); } final Stream copyParams = copyMediaRequest.items().stream().map(CopyMediaRequest::toCopyParameters); - return Mono.fromFuture(backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent)) - .flatMapMany(backupUser -> backupManager.copyToBackup(backupUser, copyParams.toList())) + final AuthenticatedBackupUser backupUser = + backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent); + final List copyResults = backupManager.copyToBackup(backupUser, copyParams.toList()) .doOnNext(result -> backupMetrics.updateCopyCounter(result, UserAgentTagUtil.getPlatformTag(userAgent))) .map(CopyMediaBatchResponse.Entry::fromCopyResult) - .collectList() - .map(list -> Response.status(207).entity(new CopyMediaBatchResponse(list)).build()) - .toFuture(); + .collectList().block(); + return Response.status(207).entity(new CopyMediaBatchResponse(copyResults)).build(); } @POST @@ -853,7 +850,8 @@ public class ArchiveController { @ApiResponse(responseCode = "204", description = "The backup was successfully refreshed") @ApiResponse(responseCode = "429", description = "Rate limited.") @ApiResponseZkAuth - public CompletionStage refresh( + @ManagedAsync + public void refresh( @Auth final Optional account, @HeaderParam(HttpHeaders.USER_AGENT) final String userAgent, @@ -867,10 +865,9 @@ public class ArchiveController { if (account.isPresent()) { throw new BadRequestException("must not use authenticated connection for anonymous operations"); } - return backupManager - .authenticateBackupUser(presentation.presentation, signature.signature, userAgent) - .thenCompose(backupManager::ttlRefresh) - .thenApply(Util.ASYNC_EMPTY_RESPONSE); + final AuthenticatedBackupUser backupUser = + backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent); + backupManager.ttlRefresh(backupUser); } record StoredMediaObject( @@ -920,7 +917,8 @@ public class ArchiveController { @ApiResponse(responseCode = "400", description = "Invalid cursor or limit") @ApiResponse(responseCode = "429", description = "Rate limited.") @ApiResponseZkAuth - public CompletionStage listMedia( + @ManagedAsync + public ListResponse listMedia( @Auth final Optional account, @HeaderParam(HttpHeaders.USER_AGENT) final String userAgent, @@ -940,16 +938,16 @@ public class ArchiveController { if (account.isPresent()) { throw new BadRequestException("must not use authenticated connection for anonymous operations"); } - return backupManager - .authenticateBackupUser(presentation.presentation, signature.signature, userAgent) - .thenCompose(backupUser -> backupManager.list(backupUser, cursor, limit.orElse(1000)) - .thenApply(result -> new ListResponse( - result.media() - .stream().map(entry -> new StoredMediaObject(entry.cdn(), entry.key(), entry.length())) - .toList(), - backupUser.backupDir(), - backupUser.mediaDir(), - result.cursor().orElse(null)))); + final AuthenticatedBackupUser backupUser = + backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent); + final BackupManager.ListMediaResult listResult = + backupManager.list(backupUser, cursor, limit.orElse(1000)); + return new ListResponse(listResult.media() + .stream().map(entry -> new StoredMediaObject(entry.cdn(), entry.key(), entry.length())) + .toList(), + backupUser.backupDir(), + backupUser.mediaDir(), + listResult.cursor().orElse(null)); } public record DeleteMedia(@Size(min = 1, max = 1000) List<@Valid MediaToDelete> mediaToDelete) { @@ -976,7 +974,8 @@ public class ArchiveController { @ApiResponse(responseCode = "204", description = "The provided objects were successfully deleted or they do not exist") @ApiResponse(responseCode = "429", description = "Rate limited.") @ApiResponseZkAuth - public CompletionStage deleteMedia( + @ManagedAsync + public void deleteMedia( @Auth final Optional account, @HeaderParam(HttpHeaders.USER_AGENT) final String userAgent, @@ -997,12 +996,9 @@ public class ArchiveController { .map(media -> new BackupManager.StorageDescriptor(media.cdn(), media.mediaId)) .toList(); - return backupManager - .authenticateBackupUser(presentation.presentation, signature.signature, userAgent) - .thenCompose(authenticatedBackupUser -> backupManager - .deleteMedia(authenticatedBackupUser, toDelete) - .then().toFuture()) - .thenApply(Util.ASYNC_EMPTY_RESPONSE); + final AuthenticatedBackupUser backupUser = + backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent); + backupManager.deleteMedia(backupUser, toDelete).then().block(); } @DELETE @@ -1013,7 +1009,8 @@ public class ArchiveController { @ApiResponse(responseCode = "204", description = "The backup has been successfully removed") @ApiResponse(responseCode = "429", description = "Rate limited.") @ApiResponseZkAuth - public CompletionStage deleteBackup( + @ManagedAsync + public void deleteBackup( @Auth final Optional account, @HeaderParam(HttpHeaders.USER_AGENT) final String userAgent, @@ -1027,10 +1024,10 @@ public class ArchiveController { if (account.isPresent()) { throw new BadRequestException("must not use authenticated connection for anonymous operations"); } - return backupManager - .authenticateBackupUser(presentation.presentation, signature.signature, userAgent) - .thenCompose(backupManager::deleteEntireBackup) - .thenApply(Util.ASYNC_EMPTY_RESPONSE); + final AuthenticatedBackupUser backupUser = + backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent); + + backupManager.deleteEntireBackup(backupUser); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/DeviceCheckController.java b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/DeviceCheckController.java index 765580d91..b67fc98b6 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/DeviceCheckController.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/DeviceCheckController.java @@ -252,8 +252,7 @@ public class DeviceCheckController { switch (request.assertionRequest().action()) { case BACKUP -> backupAuthManager.extendBackupVoucher( account, - new Account.BackupVoucher(backupRedemptionLevel, clock.instant().plus(backupRedemptionDuration))) - .join(); + new Account.BackupVoucher(backupRedemptionLevel, clock.instant().plus(backupRedemptionDuration))); } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/BackupsAnonymousGrpcService.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/BackupsAnonymousGrpcService.java index 3dd2ee8e6..de8d282eb 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/BackupsAnonymousGrpcService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/BackupsAnonymousGrpcService.java @@ -8,9 +8,7 @@ import com.google.protobuf.ByteString; import io.grpc.Status; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import io.micrometer.core.instrument.Metrics; -import io.micrometer.core.instrument.Tag; -import io.micrometer.core.instrument.Tags; +import java.util.concurrent.Flow; import org.signal.chat.backup.CopyMediaRequest; import org.signal.chat.backup.CopyMediaResponse; import org.signal.chat.backup.DeleteAllRequest; @@ -27,29 +25,30 @@ import org.signal.chat.backup.GetUploadFormRequest; import org.signal.chat.backup.GetUploadFormResponse; import org.signal.chat.backup.ListMediaRequest; import org.signal.chat.backup.ListMediaResponse; -import org.signal.chat.backup.ReactorBackupsAnonymousGrpc; import org.signal.chat.backup.RefreshRequest; import org.signal.chat.backup.RefreshResponse; import org.signal.chat.backup.SetPublicKeyRequest; import org.signal.chat.backup.SetPublicKeyResponse; import org.signal.chat.backup.SignedPresentation; +import org.signal.chat.backup.SimpleBackupsAnonymousGrpc; import org.signal.libsignal.protocol.InvalidKeyException; import org.signal.libsignal.protocol.ecc.ECPublicKey; import org.signal.libsignal.zkgroup.InvalidInputException; import org.signal.libsignal.zkgroup.backups.BackupAuthCredentialPresentation; import org.whispersystems.textsecuregcm.auth.AuthenticatedBackupUser; +import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentials; import org.whispersystems.textsecuregcm.backup.BackupManager; +import org.whispersystems.textsecuregcm.backup.BackupUploadDescriptor; import org.whispersystems.textsecuregcm.backup.CopyParameters; import org.whispersystems.textsecuregcm.backup.MediaEncryptionParameters; -import org.whispersystems.textsecuregcm.controllers.ArchiveController; +import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException; import org.whispersystems.textsecuregcm.metrics.BackupMetrics; import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil; +import reactor.adapter.JdkFlowAdapter; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; - -public class BackupsAnonymousGrpcService extends ReactorBackupsAnonymousGrpc.BackupsAnonymousImplBase { +public class BackupsAnonymousGrpcService extends SimpleBackupsAnonymousGrpc.BackupsAnonymousImplBase { private final BackupManager backupManager; private final BackupMetrics backupMetrics; @@ -60,87 +59,89 @@ public class BackupsAnonymousGrpcService extends ReactorBackupsAnonymousGrpc.Bac } @Override - public Mono getCdnCredentials(final GetCdnCredentialsRequest request) { - return authenticateBackupUserMono(request.getSignedPresentation()) - .map(user -> backupManager.generateReadAuth(user, request.getCdn())) - .map(credentials -> GetCdnCredentialsResponse.newBuilder().putAllHeaders(credentials).build()); + public GetCdnCredentialsResponse getCdnCredentials(final GetCdnCredentialsRequest request) { + final AuthenticatedBackupUser backupUser = authenticateBackupUser(request.getSignedPresentation()); + return GetCdnCredentialsResponse.newBuilder() + .putAllHeaders(backupManager.generateReadAuth(backupUser, request.getCdn())) + .build(); } @Override - public Mono getSvrBCredentials(final GetSvrBCredentialsRequest request) { - return authenticateBackupUserMono(request.getSignedPresentation()) - .map(backupManager::generateSvrbAuth) - .map(credentials -> GetSvrBCredentialsResponse.newBuilder() - .setUsername(credentials.username()) - .setPassword(credentials.password()) - .build()); + public GetSvrBCredentialsResponse getSvrBCredentials(final GetSvrBCredentialsRequest request) { + final AuthenticatedBackupUser backupUser = authenticateBackupUser(request.getSignedPresentation()); + final ExternalServiceCredentials credentials = backupManager.generateSvrbAuth(backupUser); + return GetSvrBCredentialsResponse.newBuilder() + .setUsername(credentials.username()) + .setPassword(credentials.password()) + .build(); } @Override - public Mono getBackupInfo(final GetBackupInfoRequest request) { - return Mono.fromFuture(() -> - authenticateBackupUser(request.getSignedPresentation()).thenCompose(backupManager::backupInfo)) - .map(info -> GetBackupInfoResponse.newBuilder() - .setBackupName(info.messageBackupKey()) - .setCdn(info.cdn()) - .setBackupDir(info.backupSubdir()) - .setMediaDir(info.mediaSubdir()) - .setUsedSpace(info.mediaUsedSpace().orElse(0L)) - .build()); + public GetBackupInfoResponse getBackupInfo(final GetBackupInfoRequest request) { + final AuthenticatedBackupUser backupUser = authenticateBackupUser(request.getSignedPresentation()); + final BackupManager.BackupInfo info = backupManager.backupInfo(backupUser); + return GetBackupInfoResponse.newBuilder() + .setBackupName(info.messageBackupKey()) + .setCdn(info.cdn()) + .setBackupDir(info.backupSubdir()) + .setMediaDir(info.mediaSubdir()) + .setUsedSpace(info.mediaUsedSpace().orElse(0L)) + .build(); } @Override - public Mono refresh(final RefreshRequest request) { - return Mono.fromFuture(() -> authenticateBackupUser(request.getSignedPresentation()) - .thenCompose(backupManager::ttlRefresh)) - .thenReturn(RefreshResponse.getDefaultInstance()); + public RefreshResponse refresh(final RefreshRequest request) { + final AuthenticatedBackupUser backupUser = authenticateBackupUser(request.getSignedPresentation()); + backupManager.ttlRefresh(backupUser); + return RefreshResponse.getDefaultInstance(); } @Override - public Mono setPublicKey(final SetPublicKeyRequest request) { + public SetPublicKeyResponse setPublicKey(final SetPublicKeyRequest request) { final ECPublicKey publicKey = deserialize(ECPublicKey::new, request.getPublicKey().toByteArray()); final BackupAuthCredentialPresentation presentation = deserialize( BackupAuthCredentialPresentation::new, request.getSignedPresentation().getPresentation().toByteArray()); final byte[] signature = request.getSignedPresentation().getPresentationSignature().toByteArray(); - return Mono.fromFuture(() -> backupManager.setPublicKey(presentation, signature, publicKey)) - .thenReturn(SetPublicKeyResponse.getDefaultInstance()); + backupManager.setPublicKey(presentation, signature, publicKey); + return SetPublicKeyResponse.getDefaultInstance(); } @Override - public Mono getUploadForm(final GetUploadFormRequest request) { - return authenticateBackupUserMono(request.getSignedPresentation()) - .flatMap(backupUser -> switch (request.getUploadTypeCase()) { - case MESSAGES -> { - final long uploadLength = request.getMessages().getUploadLength(); - final boolean oversize = uploadLength > BackupManager.MAX_MESSAGE_BACKUP_OBJECT_SIZE; - backupMetrics.updateMessageBackupSizeDistribution(backupUser, oversize, Optional.of(uploadLength)); - if (oversize) { - yield Mono.error(Status.FAILED_PRECONDITION - .withDescription("Exceeds max upload length") - .asRuntimeException()); - } + public GetUploadFormResponse getUploadForm(final GetUploadFormRequest request) throws RateLimitExceededException { + final AuthenticatedBackupUser backupUser = authenticateBackupUser(request.getSignedPresentation()); + final BackupUploadDescriptor uploadDescriptor = switch (request.getUploadTypeCase()) { + case MESSAGES -> { + final long uploadLength = request.getMessages().getUploadLength(); + final boolean oversize = uploadLength > BackupManager.MAX_MESSAGE_BACKUP_OBJECT_SIZE; + backupMetrics.updateMessageBackupSizeDistribution(backupUser, oversize, Optional.of(uploadLength)); + if (oversize) { + throw Status.FAILED_PRECONDITION + .withDescription("Exceeds max upload length") + .asRuntimeException(); + } - yield Mono.fromFuture(backupManager.createMessageBackupUploadDescriptor(backupUser)); - } - case MEDIA -> Mono.fromCompletionStage(backupManager.createTemporaryAttachmentUploadDescriptor(backupUser)); - case UPLOADTYPE_NOT_SET -> Mono.error(Status.INVALID_ARGUMENT - .withDescription("Must set upload_type") - .asRuntimeException()); - }) - .map(uploadDescriptor -> GetUploadFormResponse.newBuilder() - .setCdn(uploadDescriptor.cdn()) - .setKey(uploadDescriptor.key()) - .setSignedUploadLocation(uploadDescriptor.signedUploadLocation()) - .putAllHeaders(uploadDescriptor.headers()) - .build()); + yield backupManager.createMessageBackupUploadDescriptor(backupUser); + } + case MEDIA -> backupManager.createTemporaryAttachmentUploadDescriptor(backupUser); + case UPLOADTYPE_NOT_SET -> throw Status.INVALID_ARGUMENT + .withDescription("Must set upload_type") + .asRuntimeException(); + }; + return GetUploadFormResponse.newBuilder() + .setCdn(uploadDescriptor.cdn()) + .setKey(uploadDescriptor.key()) + .setSignedUploadLocation(uploadDescriptor.signedUploadLocation()) + .putAllHeaders(uploadDescriptor.headers()) + .build(); } @Override - public Flux copyMedia(final CopyMediaRequest request) { - return authenticateBackupUserMono(request.getSignedPresentation()) + public Flow.Publisher copyMedia(final CopyMediaRequest request) { + final Flux flux = Mono + .fromFuture(() -> authenticateBackupUserAsync(request.getSignedPresentation())) .flatMapMany(backupUser -> backupManager.copyToBackup(backupUser, request.getItemsList().stream().map(item -> new CopyParameters( item.getSourceAttachmentCdn(), item.getSourceKey(), @@ -167,46 +168,43 @@ public class BackupsAnonymousGrpcService extends ReactorBackupsAnonymousGrpc.Bac }; return builder.build(); }); - + return JdkFlowAdapter.publisherToFlowPublisher(flux); } @Override - public Mono listMedia(final ListMediaRequest request) { - return authenticateBackupUserMono(request.getSignedPresentation()).zipWhen( - backupUser -> Mono.fromFuture(backupManager.list( - backupUser, - request.hasCursor() ? Optional.of(request.getCursor()) : Optional.empty(), - request.getLimit()).toCompletableFuture()), - - (backupUser, listResult) -> { - final ListMediaResponse.Builder builder = ListMediaResponse.newBuilder(); - for (BackupManager.StorageDescriptorWithLength sd : listResult.media()) { - builder.addPage(ListMediaResponse.ListEntry.newBuilder() - .setMediaId(ByteString.copyFrom(sd.key())) - .setCdn(sd.cdn()) - .setLength(sd.length()) - .build()); - } - builder - .setBackupDir(backupUser.backupDir()) - .setMediaDir(backupUser.mediaDir()); - listResult.cursor().ifPresent(builder::setCursor); - return builder.build(); - }); + public ListMediaResponse listMedia(final ListMediaRequest request) { + final AuthenticatedBackupUser backupUser = authenticateBackupUser(request.getSignedPresentation()); + final BackupManager.ListMediaResult listResult = backupManager.list( + backupUser, + request.hasCursor() ? Optional.of(request.getCursor()) : Optional.empty(), + request.getLimit()); + final ListMediaResponse.Builder builder = ListMediaResponse.newBuilder(); + for (BackupManager.StorageDescriptorWithLength sd : listResult.media()) { + builder.addPage(ListMediaResponse.ListEntry.newBuilder() + .setMediaId(ByteString.copyFrom(sd.key())) + .setCdn(sd.cdn()) + .setLength(sd.length()) + .build()); + } + builder + .setBackupDir(backupUser.backupDir()) + .setMediaDir(backupUser.mediaDir()); + listResult.cursor().ifPresent(builder::setCursor); + return builder.build(); } @Override - public Mono deleteAll(final DeleteAllRequest request) { - return Mono.fromFuture(() -> authenticateBackupUser(request.getSignedPresentation()) - .thenCompose(backupManager::deleteEntireBackup)) - .thenReturn(DeleteAllResponse.getDefaultInstance()); + public DeleteAllResponse deleteAll(final DeleteAllRequest request) { + final AuthenticatedBackupUser backupUser = authenticateBackupUser(request.getSignedPresentation()); + backupManager.deleteEntireBackup(backupUser); + return DeleteAllResponse.getDefaultInstance(); } @Override - public Flux deleteMedia(final DeleteMediaRequest request) { - return Mono - .fromFuture(() -> authenticateBackupUser(request.getSignedPresentation())) + public Flow.Publisher deleteMedia(final DeleteMediaRequest request) { + return JdkFlowAdapter.publisherToFlowPublisher(Mono + .fromFuture(() -> authenticateBackupUserAsync(request.getSignedPresentation())) .flatMapMany(backupUser -> backupManager.deleteMedia(backupUser, request .getItemsList() .stream() @@ -214,20 +212,15 @@ public class BackupsAnonymousGrpcService extends ReactorBackupsAnonymousGrpc.Bac .toList())) .map(storageDescriptor -> DeleteMediaResponse.newBuilder() .setMediaId(ByteString.copyFrom(storageDescriptor.key())) - .setCdn(storageDescriptor.cdn()).build()); + .setCdn(storageDescriptor.cdn()).build())); } - private Mono authenticateBackupUserMono(final SignedPresentation signedPresentation) { - return Mono.fromFuture(() -> authenticateBackupUser(signedPresentation)); - } - - private CompletableFuture authenticateBackupUser( - final SignedPresentation signedPresentation) { + private CompletableFuture authenticateBackupUserAsync(final SignedPresentation signedPresentation) { if (signedPresentation == null) { throw Status.UNAUTHENTICATED.asRuntimeException(); } try { - return backupManager.authenticateBackupUser( + return backupManager.authenticateBackupUserAsync( new BackupAuthCredentialPresentation(signedPresentation.getPresentation().toByteArray()), signedPresentation.getPresentationSignature().toByteArray(), RequestAttributesUtil.getUserAgent().orElse(null)); @@ -236,6 +229,10 @@ public class BackupsAnonymousGrpcService extends ReactorBackupsAnonymousGrpc.Bac } } + private AuthenticatedBackupUser authenticateBackupUser(final SignedPresentation signedPresentation) { + return authenticateBackupUserAsync(signedPresentation).join(); + } + /** * Convert an int from a proto uint32 to a signed positive integer, throwing if the value exceeds * {@link Integer#MAX_VALUE}. To convert to a long, see {@link Integer#toUnsignedLong(int)} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/BackupsGrpcService.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/BackupsGrpcService.java index 4d7ba2dea..929307a0d 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/BackupsGrpcService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/BackupsGrpcService.java @@ -14,11 +14,11 @@ import java.util.Optional; import java.util.stream.Collectors; import org.signal.chat.backup.GetBackupAuthCredentialsRequest; import org.signal.chat.backup.GetBackupAuthCredentialsResponse; -import org.signal.chat.backup.ReactorBackupsGrpc; import org.signal.chat.backup.RedeemReceiptRequest; import org.signal.chat.backup.RedeemReceiptResponse; import org.signal.chat.backup.SetBackupIdRequest; import org.signal.chat.backup.SetBackupIdResponse; +import org.signal.chat.backup.SimpleBackupsGrpc; import org.signal.chat.common.ZkCredential; import org.signal.libsignal.zkgroup.InvalidInputException; import org.signal.libsignal.zkgroup.backups.BackupAuthCredentialRequest; @@ -28,14 +28,14 @@ import org.whispersystems.textsecuregcm.auth.RedemptionRange; import org.whispersystems.textsecuregcm.auth.grpc.AuthenticatedDevice; import org.whispersystems.textsecuregcm.auth.grpc.AuthenticationUtil; import org.whispersystems.textsecuregcm.backup.BackupAuthManager; +import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException; import org.whispersystems.textsecuregcm.metrics.BackupMetrics; import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.Device; -import reactor.core.publisher.Mono; -public class BackupsGrpcService extends ReactorBackupsGrpc.BackupsImplBase { +public class BackupsGrpcService extends SimpleBackupsGrpc.BackupsImplBase { private final AccountsManager accountManager; private final BackupAuthManager backupAuthManager; @@ -48,7 +48,7 @@ public class BackupsGrpcService extends ReactorBackupsGrpc.BackupsImplBase { } @Override - public Mono setBackupId(SetBackupIdRequest request) { + public SetBackupIdResponse setBackupId(SetBackupIdRequest request) throws RateLimitExceededException { final Optional messagesCredentialRequest = deserializeWithEmptyPresenceCheck( BackupAuthCredentialRequest::new, @@ -59,28 +59,25 @@ public class BackupsGrpcService extends ReactorBackupsGrpc.BackupsImplBase { request.getMediaBackupAuthCredentialRequest()); final AuthenticatedDevice authenticatedDevice = AuthenticationUtil.requireAuthenticatedDevice(); - return authenticatedAccount() - .flatMap(account -> { - final Device device = account - .getDevice(authenticatedDevice.deviceId()) - .orElseThrow(Status.UNAUTHENTICATED::asRuntimeException); - return Mono.fromFuture( - backupAuthManager.commitBackupId(account, device, messagesCredentialRequest, mediaCredentialRequest)); - }) - .thenReturn(SetBackupIdResponse.getDefaultInstance()); + final Account account = authenticatedAccount(); + final Device device = account + .getDevice(authenticatedDevice.deviceId()) + .orElseThrow(Status.UNAUTHENTICATED::asRuntimeException); + backupAuthManager.commitBackupId(account, device, messagesCredentialRequest, mediaCredentialRequest); + return SetBackupIdResponse.getDefaultInstance(); } - public Mono redeemReceipt(RedeemReceiptRequest request) { + public RedeemReceiptResponse redeemReceipt(RedeemReceiptRequest request) { final ReceiptCredentialPresentation receiptCredentialPresentation = deserialize( ReceiptCredentialPresentation::new, request.getPresentation().toByteArray()); - return authenticatedAccount() - .flatMap(account -> Mono.fromFuture(backupAuthManager.redeemReceipt(account, receiptCredentialPresentation))) - .thenReturn(RedeemReceiptResponse.getDefaultInstance()); + final Account account = authenticatedAccount(); + backupAuthManager.redeemReceipt(account, receiptCredentialPresentation); + return RedeemReceiptResponse.getDefaultInstance(); } @Override - public Mono getBackupAuthCredentials(GetBackupAuthCredentialsRequest request) { + public GetBackupAuthCredentialsResponse getBackupAuthCredentials(GetBackupAuthCredentialsRequest request) { final Tag platformTag = UserAgentTagUtil.getPlatformTag(RequestAttributesUtil.getUserAgent().orElse(null)); final RedemptionRange redemptionRange; try { @@ -90,46 +87,41 @@ public class BackupsGrpcService extends ReactorBackupsGrpc.BackupsImplBase { } catch (IllegalArgumentException e) { throw Status.INVALID_ARGUMENT.withDescription(e.getMessage()).asRuntimeException(); } - return authenticatedAccount().flatMap(account -> { - final Mono> messageCredentials = Mono.fromCompletionStage(() -> - backupAuthManager.getBackupAuthCredentials( - account, - BackupCredentialType.MESSAGES, - redemptionRange)) - .doOnSuccess(credentials -> - backupMetrics.updateGetCredentialCounter(platformTag, BackupCredentialType.MESSAGES, credentials.size())); + final Account account = authenticatedAccount(); + final List messageCredentials = + backupAuthManager.getBackupAuthCredentials( + account, + BackupCredentialType.MESSAGES, + redemptionRange); + backupMetrics.updateGetCredentialCounter(platformTag, BackupCredentialType.MESSAGES, messageCredentials.size()); - final Mono> mediaCredentials = Mono.fromCompletionStage(() -> - backupAuthManager.getBackupAuthCredentials( - account, - BackupCredentialType.MEDIA, - redemptionRange)) - .doOnSuccess(credentials -> - backupMetrics.updateGetCredentialCounter(platformTag, BackupCredentialType.MEDIA, credentials.size())); + final List mediaCredentials = + backupAuthManager.getBackupAuthCredentials( + account, + BackupCredentialType.MEDIA, + redemptionRange); + backupMetrics.updateGetCredentialCounter(platformTag, BackupCredentialType.MEDIA, mediaCredentials.size()); - return messageCredentials.zipWith(mediaCredentials, (messageCreds, mediaCreds) -> - GetBackupAuthCredentialsResponse.newBuilder() - .putAllMessageCredentials(messageCreds.stream().collect(Collectors.toMap( - c -> c.redemptionTime().getEpochSecond(), - c -> ZkCredential.newBuilder() - .setCredential(ByteString.copyFrom(c.credential().serialize())) - .setRedemptionTime(c.redemptionTime().getEpochSecond()) - .build()))) - .putAllMediaCredentials(mediaCreds.stream().collect(Collectors.toMap( - c -> c.redemptionTime().getEpochSecond(), - c -> ZkCredential.newBuilder() - .setCredential(ByteString.copyFrom(c.credential().serialize())) - .setRedemptionTime(c.redemptionTime().getEpochSecond()) - .build()))) - .build()); - }); + return GetBackupAuthCredentialsResponse.newBuilder() + .putAllMessageCredentials(messageCredentials.stream().collect(Collectors.toMap( + c -> c.redemptionTime().getEpochSecond(), + c -> ZkCredential.newBuilder() + .setCredential(ByteString.copyFrom(c.credential().serialize())) + .setRedemptionTime(c.redemptionTime().getEpochSecond()) + .build()))) + .putAllMediaCredentials(mediaCredentials.stream().collect(Collectors.toMap( + c -> c.redemptionTime().getEpochSecond(), + c -> ZkCredential.newBuilder() + .setCredential(ByteString.copyFrom(c.credential().serialize())) + .setRedemptionTime(c.redemptionTime().getEpochSecond()) + .build()))) + .build(); } - private Mono authenticatedAccount() { - final AuthenticatedDevice authenticatedDevice = AuthenticationUtil.requireAuthenticatedDevice(); - return Mono - .fromFuture(() -> accountManager.getByAccountIdentifierAsync(authenticatedDevice.accountIdentifier())) - .map(maybeAccount -> maybeAccount.orElseThrow(Status.UNAUTHENTICATED::asRuntimeException)); + private Account authenticatedAccount() { + return accountManager + .getByAccountIdentifier(AuthenticationUtil.requireAuthenticatedDevice().accountIdentifier()) + .orElseThrow(Status.UNAUTHENTICATED::asRuntimeException); } private interface Deserializer { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/ErrorMappingInterceptor.java b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/ErrorMappingInterceptor.java index eeddbe203..2322956b9 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/grpc/ErrorMappingInterceptor.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/grpc/ErrorMappingInterceptor.java @@ -14,6 +14,7 @@ import io.grpc.Status; import io.grpc.StatusRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.util.ExceptionUtils; import java.io.IOException; import java.io.UncheckedIOException; @@ -46,7 +47,9 @@ public class ErrorMappingInterceptor implements ServerInterceptor { return; } - final StatusRuntimeException statusException = switch (status.getCause()) { + final Throwable cause = ExceptionUtils.unwrap(status.getCause()); + + final StatusRuntimeException statusException = switch (cause) { case ConvertibleToGrpcStatus e -> e.toStatusRuntimeException(); case UncheckedIOException e -> { log.warn("RPC {} encountered UncheckedIOException", call.getMethodDescriptor().getFullMethodName(), e.getCause()); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/util/ExceptionUtils.java b/service/src/main/java/org/whispersystems/textsecuregcm/util/ExceptionUtils.java index 99a728386..99250e77b 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/util/ExceptionUtils.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/util/ExceptionUtils.java @@ -2,6 +2,7 @@ package org.whispersystems.textsecuregcm.util; import java.util.concurrent.CompletionException; import java.util.function.Function; +import java.util.function.Supplier; public final class ExceptionUtils { @@ -81,4 +82,49 @@ public final class ExceptionUtils { throw wrap(fn.apply(e)); }); } + + /** + * Runs the supplier, throwing a checked exception if the supplier throws an exception that unwraps to the provided type + * + * @param exType The exception type to check for + * @param supplier A supplier that produces a T + * @return The result of the supplier + * @param The supplier type + * @param The checked exception type + * @throws E If the supplier throws E or a type that {@link #unwrap}s to E + */ + public static T unwrapSupply(Class exType, Supplier supplier) throws E { + try { + return supplier.get(); + } catch (RuntimeException e) { + final Throwable ex = unwrap(e); + if (exType.isInstance(ex)) { + throw exType.cast(ex); + } + throw e; + } + } + + /** + * Runs the supplier, throwing a checked exception if the supplier throws an exception that unwraps to the provided type + * + * @param exType The exception type to check for + * @param supplier A supplier that produces a T + * @param marshal A function that maps from the thrown type to another exception type + * @return The result of the supplier + * @param The supplier type + * @param The checked exception type that may be thrown from supplier + * @throws F If the supplier throws E or a type that {@link #unwrap}s to E + */ + public static T unwrapSupply(Class exType, Supplier supplier, Function marshal) throws F { + try { + return supplier.get(); + } catch (RuntimeException e) { + final Throwable ex = unwrap(e); + if (exType.isInstance(ex)) { + throw marshal.apply(exType.cast(ex)); + } + throw e; + } + } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupAuthManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupAuthManagerTest.java index 655d1591a..851fdb115 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupAuthManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupAuthManagerTest.java @@ -12,6 +12,7 @@ import static org.assertj.core.api.Assertions.assertThatNoException; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; @@ -64,7 +65,6 @@ import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.storage.RedeemedReceiptsManager; import org.whispersystems.textsecuregcm.tests.util.ExperimentHelper; -import org.whispersystems.textsecuregcm.util.CompletableFutureTestUtil; import org.whispersystems.textsecuregcm.util.TestClock; import org.whispersystems.textsecuregcm.util.TestRandomUtil; @@ -107,19 +107,18 @@ public class BackupAuthManagerTest { } @Test - void commitBackupId() { + void commitBackupId() throws RateLimitExceededException { final BackupAuthManager authManager = create(); final Account account = mock(Account.class); when(account.getUuid()).thenReturn(aci); - when(accountsManager.updateAsync(any(), any())) + when(accountsManager.update(any(), any())) .thenAnswer(invocation -> { final Account a = invocation.getArgument(0); final Consumer updater = invocation.getArgument(1); updater.accept(a); - - return CompletableFuture.completedFuture(a); + return a; }); final BackupAuthCredentialRequest messagesCredentialRequest = backupAuthTestUtil.getRequest(messagesBackupKey, aci); @@ -127,7 +126,7 @@ public class BackupAuthManagerTest { authManager.commitBackupId(account, primaryDevice(), Optional.of(messagesCredentialRequest), - Optional.of(mediaCredentialRequest)).join(); + Optional.of(mediaCredentialRequest)); verify(account).setBackupCredentialRequests(messagesCredentialRequest.serialize(), mediaCredentialRequest.serialize()); @@ -138,13 +137,13 @@ public class BackupAuthManagerTest { void commitOnAnyBackupLevel(final BackupLevel backupLevel) { final BackupAuthManager authManager = create(); final Account account = new MockAccountBuilder().backupLevel(backupLevel).build(); - when(accountsManager.updateAsync(any(), any())).thenReturn(CompletableFuture.completedFuture(account)); + when(accountsManager.update(any(), any())).thenReturn(account); final ThrowableAssert.ThrowingCallable commit = () -> authManager.commitBackupId(account, primaryDevice(), Optional.of(backupAuthTestUtil.getRequest(messagesBackupKey, aci)), - Optional.of(backupAuthTestUtil.getRequest(mediaBackupKey, aci))).join(); + Optional.of(backupAuthTestUtil.getRequest(mediaBackupKey, aci))); Assertions.assertThatNoException().isThrownBy(commit); } @@ -152,13 +151,13 @@ public class BackupAuthManagerTest { void commitRequiresPrimary() { final BackupAuthManager authManager = create(); final Account account = new MockAccountBuilder().build(); - when(accountsManager.updateAsync(any(), any())).thenReturn(CompletableFuture.completedFuture(account)); + when(accountsManager.update(any(), any())).thenReturn(account); final ThrowableAssert.ThrowingCallable commit = () -> authManager.commitBackupId(account, linkedDevice(), Optional.of(backupAuthTestUtil.getRequest(messagesBackupKey, aci)), - Optional.of(backupAuthTestUtil.getRequest(mediaBackupKey, aci))).join(); + Optional.of(backupAuthTestUtil.getRequest(mediaBackupKey, aci))); assertThatExceptionOfType(StatusRuntimeException.class) .isThrownBy(commit) .extracting(ex -> ex.getStatus().getCode()) @@ -186,7 +185,7 @@ public class BackupAuthManagerTest { final RedemptionRange range = range(Duration.ofDays(1)); final List creds = - authManager.getBackupAuthCredentials(account, credentialType, range(Duration.ofDays(1))).join(); + authManager.getBackupAuthCredentials(account, credentialType, range(Duration.ofDays(1))); assertThat(creds).hasSize(2); assertThat(requestContext @@ -207,7 +206,7 @@ public class BackupAuthManagerTest { .mediaCredential(backupAuthTestUtil.getRequest(mediaBackupKey, aci)) .build(); - assertThat(authManager.getBackupAuthCredentials(account, credentialType, range(Duration.ofDays(1))).join()) + assertThat(authManager.getBackupAuthCredentials(account, credentialType, range(Duration.ofDays(1)))) .hasSize(2); } @@ -220,7 +219,7 @@ public class BackupAuthManagerTest { assertThatExceptionOfType(StatusRuntimeException.class) .isThrownBy(() -> - authManager.getBackupAuthCredentials(account, credentialType, range(Duration.ofDays(1))).join()) + authManager.getBackupAuthCredentials(account, credentialType, range(Duration.ofDays(1)))) .extracting(ex -> ex.getStatus().getCode()) .isEqualTo(Status.Code.NOT_FOUND); } @@ -245,7 +244,7 @@ public class BackupAuthManagerTest { .build(); final List creds = authManager.getBackupAuthCredentials(account, - credentialType, range(Duration.ofDays(7))).join(); + credentialType, range(Duration.ofDays(7))); assertThat(creds).hasSize(8); Instant redemptionTime = clock.instant().truncatedTo(ChronoUnit.DAYS); @@ -275,7 +274,7 @@ public class BackupAuthManagerTest { final List creds = authManager.getBackupAuthCredentials( account, BackupCredentialType.MESSAGES, - range(RedemptionRange.MAX_REDEMPTION_DURATION)).join(); + range(RedemptionRange.MAX_REDEMPTION_DURATION)); Instant redemptionTime = Instant.EPOCH; final BackupAuthCredentialRequestContext requestContext = BackupAuthCredentialRequestContext.create( messagesBackupKey, aci); @@ -311,15 +310,15 @@ public class BackupAuthManagerTest { .backupVoucher(null) .build(); - when(accountsManager.updateAsync(any(), any())).thenReturn(CompletableFuture.completedFuture(updated)); + when(accountsManager.update(any(), any())).thenReturn(updated); clock.pin(day2.plus(Duration.ofSeconds(1))); - assertThat(authManager.getBackupAuthCredentials(account, BackupCredentialType.MESSAGES, range(Duration.ofDays(7))).join()) + assertThat(authManager.getBackupAuthCredentials(account, BackupCredentialType.MESSAGES, range(Duration.ofDays(7)))) .hasSize(8); @SuppressWarnings("unchecked") final ArgumentCaptor> accountUpdater = ArgumentCaptor.forClass( Consumer.class); - verify(accountsManager, times(1)).updateAsync(any(), accountUpdater.capture()); + verify(accountsManager, times(1)).update(any(), accountUpdater.capture()); // If the account is not expired when we go to update it, we shouldn't wipe it out final Account alreadyUpdated = mock(Account.class); @@ -343,11 +342,11 @@ public class BackupAuthManagerTest { .mediaCredential(Optional.of(new byte[0])) .build(); clock.pin(Instant.EPOCH.plus(Duration.ofDays(1))); - when(accountsManager.updateAsync(any(), any())).thenReturn(CompletableFuture.completedFuture(account)); + when(accountsManager.update(any(), any())).thenReturn(account); when(redeemedReceiptsManager.put(any(), eq(expirationTime.getEpochSecond()), eq(201L), eq(aci))) .thenReturn(CompletableFuture.completedFuture(true)); - authManager.redeemReceipt(account, receiptPresentation(201, expirationTime)).join(); - verify(accountsManager, times(1)).updateAsync(any(), any()); + authManager.redeemReceipt(account, receiptPresentation(201, expirationTime)); + verify(accountsManager, times(1)).update(any(), any()); } @Test @@ -361,7 +360,7 @@ public class BackupAuthManagerTest { .thenReturn(CompletableFuture.completedFuture(true)); assertThatExceptionOfType(StatusRuntimeException.class) .isThrownBy(() -> - authManager.redeemReceipt(account, receiptPresentation(201, expirationTime)).join()) + authManager.redeemReceipt(account, receiptPresentation(201, expirationTime))) .extracting(ex -> ex.getStatus().getCode()) .isEqualTo(Status.Code.ABORTED); } @@ -379,13 +378,13 @@ public class BackupAuthManagerTest { .build(); clock.pin(Instant.EPOCH.plus(Duration.ofDays(1))); - when(accountsManager.updateAsync(any(), any())).thenReturn(CompletableFuture.completedFuture(account)); + when(accountsManager.update(any(), any())).thenReturn(account); when(redeemedReceiptsManager.put(any(), eq(newExpirationTime.getEpochSecond()), eq(201L), eq(aci))) .thenReturn(CompletableFuture.completedFuture(true)); - authManager.redeemReceipt(account, receiptPresentation(201, newExpirationTime)).join(); + authManager.redeemReceipt(account, receiptPresentation(201, newExpirationTime)); final ArgumentCaptor> updaterCaptor = ArgumentCaptor.captor(); - verify(accountsManager, times(1)).updateAsync(any(), updaterCaptor.capture()); + verify(accountsManager, times(1)).update(any(), updaterCaptor.capture()); updaterCaptor.getValue().accept(account); // Should select the voucher with the later expiration time @@ -398,7 +397,7 @@ public class BackupAuthManagerTest { clock.pin(expirationTime.plus(Duration.ofSeconds(1))); final BackupAuthManager authManager = create(); assertThatExceptionOfType(StatusRuntimeException.class) - .isThrownBy(() -> authManager.redeemReceipt(mock(Account.class), receiptPresentation(3, expirationTime)).join()) + .isThrownBy(() -> authManager.redeemReceipt(mock(Account.class), receiptPresentation(3, expirationTime))) .extracting(ex -> ex.getStatus().getCode()) .isEqualTo(Status.Code.INVALID_ARGUMENT); verifyNoInteractions(accountsManager); @@ -413,7 +412,7 @@ public class BackupAuthManagerTest { final BackupAuthManager authManager = create(); assertThatExceptionOfType(StatusRuntimeException.class) .isThrownBy(() -> - authManager.redeemReceipt(mock(Account.class), receiptPresentation(level, expirationTime)).join()) + authManager.redeemReceipt(mock(Account.class), receiptPresentation(level, expirationTime))) .extracting(ex -> ex.getStatus().getCode()) .isEqualTo(Status.Code.INVALID_ARGUMENT); verifyNoInteractions(accountsManager); @@ -425,7 +424,7 @@ public class BackupAuthManagerTest { final BackupAuthManager authManager = create(); final ReceiptCredentialPresentation invalid = receiptPresentation(ServerSecretParams.generate(), 3L, Instant.EPOCH); assertThatExceptionOfType(StatusRuntimeException.class) - .isThrownBy(() -> authManager.redeemReceipt(mock(Account.class), invalid).join()) + .isThrownBy(() -> authManager.redeemReceipt(mock(Account.class), invalid)) .extracting(ex -> ex.getStatus().getCode()) .isEqualTo(Status.Code.INVALID_ARGUMENT); verifyNoInteractions(accountsManager); @@ -433,7 +432,7 @@ public class BackupAuthManagerTest { } @Test - void receiptAlreadyRedeemed() throws InvalidInputException, VerificationFailedException { + void receiptAlreadyRedeemed() { final Instant expirationTime = Instant.EPOCH.plus(Duration.ofDays(1)); final BackupAuthManager authManager = create(); final Account account = new MockAccountBuilder() @@ -441,12 +440,12 @@ public class BackupAuthManagerTest { .build(); clock.pin(Instant.EPOCH.plus(Duration.ofDays(1))); - when(accountsManager.updateAsync(any(), any())).thenReturn(CompletableFuture.completedFuture(account)); + when(accountsManager.update(any(), any())).thenReturn(account); when(redeemedReceiptsManager.put(any(), eq(expirationTime.getEpochSecond()), eq(201L), eq(aci))) .thenReturn(CompletableFuture.completedFuture(false)); - final CompletableFuture result = authManager.redeemReceipt(account, receiptPresentation(201, expirationTime)); - assertThat(CompletableFutureTestUtil.assertFailsWithCause(StatusRuntimeException.class, result)) + assertThatExceptionOfType(StatusRuntimeException.class) + .isThrownBy(() -> authManager.redeemReceipt(account, receiptPresentation(201, expirationTime))) .extracting(ex -> ex.getStatus().getCode()) .isEqualTo(Status.Code.INVALID_ARGUMENT); verifyNoInteractions(accountsManager); @@ -484,8 +483,7 @@ public class BackupAuthManagerTest { ? new Account.BackupVoucher(1, Instant.EPOCH.plus(Duration.ofSeconds(1))) : null) .build(); - final BackupAuthManager.BackupIdRotationLimit limit = authManager.checkBackupIdRotationLimit(account) - .toCompletableFuture().join(); + final BackupAuthManager.BackupIdRotationLimit limit = authManager.checkBackupIdRotationLimit(account); final boolean expectHasPermits = !messageLimited && (!mediaLimited || !hasVoucher); final Duration expectedDuration = expectHasPermits ? Duration.ZERO : Duration.ofDays(1); assertThat(limit.hasPermitsRemaining()).isEqualTo(expectHasPermits); @@ -524,7 +522,7 @@ public class BackupAuthManagerTest { .backupVoucher(backupVoucher) .build(); - when(accountsManager.updateAsync(any(), any())).thenReturn(CompletableFuture.completedFuture(account)); + when(accountsManager.update(any(), any())).thenReturn(account); final Optional newMessagesCredential = switch (messageChange) { case MATCH -> Optional.of(storedMessagesCredential); @@ -543,7 +541,7 @@ public class BackupAuthManagerTest { final boolean expectRateLimit = ((mediaChange == CredentialChangeType.MISMATCH) && rateLimitMediaBackupId && paid) || ((messageChange == CredentialChangeType.MISMATCH) && rateLimitMessagesBackupId); final ThrowableAssert.ThrowingCallable commit = () -> - authManager.commitBackupId(account, primaryDevice(), newMessagesCredential, newMediaCredential).join(); + authManager.commitBackupId(account, primaryDevice(), newMessagesCredential, newMediaCredential); if (messageChange == CredentialChangeType.NO_UPDATE && mediaChange == CredentialChangeType.NO_UPDATE) { assertThatExceptionOfType(StatusRuntimeException.class) @@ -551,7 +549,7 @@ public class BackupAuthManagerTest { .extracting(ex -> ex.getStatus().getCode()) .isEqualTo(Status.Code.INVALID_ARGUMENT); } else if (expectRateLimit) { - assertThatException().isThrownBy(commit).withRootCauseInstanceOf(RateLimitExceededException.class); + assertThatExceptionOfType(RateLimitExceededException.class).isThrownBy(commit); } else { assertThatNoException().isThrownBy(commit); } @@ -611,26 +609,29 @@ public class BackupAuthManagerTest { } - private static RateLimiters rateLimiter(final UUID aci, boolean rateLimitBackupId, - boolean rateLimitPaidMediaBackupId) { - final RateLimiters limiters = mock(RateLimiters.class); + private static RateLimiters rateLimiter(final UUID aci, boolean rateLimitBackupId, boolean rateLimitPaidMediaBackupId) { + try { + final RateLimiters limiters = mock(RateLimiters.class); - final RateLimiter allowLimiter = mock(RateLimiter.class); - when(allowLimiter.hasAvailablePermitsAsync(eq(aci), anyLong())).thenReturn(CompletableFuture.completedFuture(true)); - when(allowLimiter.validateAsync(aci)).thenReturn(CompletableFuture.completedFuture(null)); - when(allowLimiter.config()).thenReturn(new RateLimiterConfig(1, Duration.ofDays(1), false)); + final RateLimiter allowLimiter = mock(RateLimiter.class); + when(allowLimiter.hasAvailablePermitsAsync(eq(aci), anyLong())).thenReturn( + CompletableFuture.completedFuture(true)); + when(allowLimiter.config()).thenReturn(new RateLimiterConfig(1, Duration.ofDays(1), false)); - final RateLimiter denyLimiter = mock(RateLimiter.class); - when(denyLimiter.hasAvailablePermitsAsync(eq(aci), anyLong())).thenReturn(CompletableFuture.completedFuture(false)); - when(denyLimiter.validateAsync(aci)) - .thenReturn(CompletableFuture.failedFuture(new RateLimitExceededException(null))); - when(denyLimiter.config()).thenReturn(new RateLimiterConfig(1, Duration.ofDays(1), false)); + final RateLimiter denyLimiter = mock(RateLimiter.class); + when(denyLimiter.hasAvailablePermitsAsync(eq(aci), anyLong())).thenReturn( + CompletableFuture.completedFuture(false)); + doThrow(new RateLimitExceededException(null)).when(denyLimiter).validate(aci); + when(denyLimiter.config()).thenReturn(new RateLimiterConfig(1, Duration.ofDays(1), false)); - when(limiters.forDescriptor(RateLimiters.For.SET_BACKUP_ID)) - .thenReturn(rateLimitBackupId ? denyLimiter : allowLimiter); - when(limiters.forDescriptor(RateLimiters.For.SET_PAID_MEDIA_BACKUP_ID)) - .thenReturn(rateLimitPaidMediaBackupId ? denyLimiter : allowLimiter); - return limiters; + when(limiters.forDescriptor(RateLimiters.For.SET_BACKUP_ID)) + .thenReturn(rateLimitBackupId ? denyLimiter : allowLimiter); + when(limiters.forDescriptor(RateLimiters.For.SET_PAID_MEDIA_BACKUP_ID)) + .thenReturn(rateLimitPaidMediaBackupId ? denyLimiter : allowLimiter); + return limiters; + } catch (RateLimitExceededException e) { + throw new RuntimeException(e); + } } private RedemptionRange range(Duration length) { diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupAuthTestUtil.java b/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupAuthTestUtil.java index e8b919b62..3bcf41a09 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupAuthTestUtil.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupAuthTestUtil.java @@ -76,6 +76,6 @@ public class BackupAuthTestUtil { }); final RedemptionRange redemptionRange; redemptionRange = RedemptionRange.inclusive(clock, redemptionStart, redemptionEnd); - return issuer.getBackupAuthCredentials(account, credentialType, redemptionRange).join(); + return issuer.getBackupAuthCredentials(account, credentialType, redemptionRange); } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupManagerTest.java index f09ad6fc5..20189075a 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupManagerTest.java @@ -13,6 +13,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; @@ -230,11 +231,11 @@ public class BackupManagerTest { final AuthenticatedBackupUser backupUser = backupUser(TestRandomUtil.nextBytes(16), BackupCredentialType.MESSAGES, backupLevel); - backupManager.createMessageBackupUploadDescriptor(backupUser).join(); + backupManager.createMessageBackupUploadDescriptor(backupUser); verify(tusCredentialGenerator, times(1)) .generateUpload("%s/%s".formatted(backupUser.backupDir(), BackupManager.MESSAGE_BACKUP_NAME)); - final BackupManager.BackupInfo info = backupManager.backupInfo(backupUser).join(); + final BackupManager.BackupInfo info = backupManager.backupInfo(backupUser); assertThat(info.backupSubdir()).isEqualTo(backupUser.backupDir()).isNotBlank(); assertThat(info.messageBackupKey()).isEqualTo(BackupManager.MESSAGE_BACKUP_NAME); assertThat(info.mediaUsedSpace()).isEqualTo(Optional.empty()); @@ -253,18 +254,17 @@ public class BackupManagerTest { final AuthenticatedBackupUser backupUser = backupUser(TestRandomUtil.nextBytes(16), BackupCredentialType.MEDIA, backupLevel); assertThatExceptionOfType(StatusRuntimeException.class) - .isThrownBy(() -> backupManager.createMessageBackupUploadDescriptor(backupUser).join()) + .isThrownBy(() -> backupManager.createMessageBackupUploadDescriptor(backupUser)) .matches(exception -> exception.getStatus().getCode() == Status.UNAUTHENTICATED.getCode()); } @Test - public void createTemporaryMediaAttachmentRateLimited() { + public void createTemporaryMediaAttachmentRateLimited() throws RateLimitExceededException { final AuthenticatedBackupUser backupUser = backupUser(TestRandomUtil.nextBytes(16), BackupCredentialType.MEDIA, BackupLevel.PAID); - when(mediaUploadLimiter.validateAsync(eq(BackupManager.rateLimitKey(backupUser)))) - .thenReturn(CompletableFuture.failedFuture(new RateLimitExceededException(null))); - CompletableFutureTestUtil.assertFailsWithCause( - RateLimitExceededException.class, - backupManager.createTemporaryAttachmentUploadDescriptor(backupUser).toCompletableFuture()); + doThrow(new RateLimitExceededException(null)) + .when(mediaUploadLimiter).validate(eq(BackupManager.rateLimitKey(backupUser))); + assertThatExceptionOfType(RateLimitExceededException.class) + .isThrownBy(() -> backupManager.createTemporaryAttachmentUploadDescriptor(backupUser)); } @Test @@ -297,11 +297,11 @@ public class BackupManagerTest { // create backup at t=tstart testClock.pin(tstart); - backupManager.createMessageBackupUploadDescriptor(backupUser).join(); + backupManager.createMessageBackupUploadDescriptor(backupUser); // refresh at t=tnext testClock.pin(tnext); - backupManager.ttlRefresh(backupUser).join(); + backupManager.ttlRefresh(backupUser); checkExpectedExpirations( tnext.truncatedTo(ChronoUnit.DAYS), @@ -319,11 +319,11 @@ public class BackupManagerTest { // create backup at t=tstart testClock.pin(tstart); - backupManager.createMessageBackupUploadDescriptor(backupUser).join(); + backupManager.createMessageBackupUploadDescriptor(backupUser); // create again at t=tnext testClock.pin(tnext); - backupManager.createMessageBackupUploadDescriptor(backupUser).join(); + backupManager.createMessageBackupUploadDescriptor(backupUser); checkExpectedExpirations( tnext.truncatedTo(ChronoUnit.DAYS), @@ -363,7 +363,7 @@ public class BackupManagerTest { backupManager.setPublicKey( presentation, keyPair.getPrivateKey().calculateSignature(presentation.serialize()), - keyPair.getPublicKey()).join(); + keyPair.getPublicKey()); assertThatExceptionOfType(StatusRuntimeException.class) .isThrownBy(() -> backupManager.authenticateBackupUser( @@ -384,10 +384,10 @@ public class BackupManagerTest { final byte[] signature = keyPair.getPrivateKey().calculateSignature(presentation.serialize()); // haven't set a public key yet - assertThat(CompletableFutureTestUtil.assertFailsWithCause( - StatusRuntimeException.class, - backupManager.authenticateBackupUser(presentation, signature, null)) - .getStatus().getCode()) + assertThatExceptionOfType(StatusRuntimeException.class) + .isThrownBy(() -> backupManager.authenticateBackupUser(presentation, signature, null)) + .extracting(StatusRuntimeException::getStatus) + .extracting(Status::getCode) .isEqualTo(Status.UNAUTHENTICATED.getCode()); } @@ -401,17 +401,17 @@ public class BackupManagerTest { final byte[] signature1 = keyPair1.getPrivateKey().calculateSignature(presentation.serialize()); final byte[] signature2 = keyPair2.getPrivateKey().calculateSignature(presentation.serialize()); - backupManager.setPublicKey(presentation, signature1, keyPair1.getPublicKey()).join(); + backupManager.setPublicKey(presentation, signature1, keyPair1.getPublicKey()); // shouldn't be able to set a different public key - assertThat(CompletableFutureTestUtil.assertFailsWithCause( - StatusRuntimeException.class, + assertThatExceptionOfType(StatusRuntimeException.class).isThrownBy(() -> backupManager.setPublicKey(presentation, signature2, keyPair2.getPublicKey())) - .getStatus().getCode()) + .extracting(StatusRuntimeException::getStatus) + .extracting(Status::getCode) .isEqualTo(Status.UNAUTHENTICATED.getCode()); // should be able to set the same public key again (noop) - backupManager.setPublicKey(presentation, signature1, keyPair1.getPublicKey()).join(); + backupManager.setPublicKey(presentation, signature1, keyPair1.getPublicKey()); } @Test @@ -432,17 +432,17 @@ public class BackupManagerTest { .extracting(ex -> ex.getStatus().getCode()) .isEqualTo(Status.UNAUTHENTICATED.getCode()); - backupManager.setPublicKey(presentation, signature, keyPair.getPublicKey()).join(); + backupManager.setPublicKey(presentation, signature, keyPair.getPublicKey()); // shouldn't be able to authenticate with an invalid signature - assertThat(CompletableFutureTestUtil.assertFailsWithCause( - StatusRuntimeException.class, - backupManager.authenticateBackupUser(presentation, wrongSignature, null)) - .getStatus().getCode()) + assertThatExceptionOfType(StatusRuntimeException.class) + .isThrownBy(() -> backupManager.authenticateBackupUser(presentation, wrongSignature, null)) + .extracting(StatusRuntimeException::getStatus) + .extracting(Status::getCode) .isEqualTo(Status.UNAUTHENTICATED.getCode()); // correct signature - final AuthenticatedBackupUser user = backupManager.authenticateBackupUser(presentation, signature, null).join(); + final AuthenticatedBackupUser user = backupManager.authenticateBackupUser(presentation, signature, null); assertThat(user.backupId()).isEqualTo(presentation.getBackupId()); assertThat(user.backupLevel()).isEqualTo(BackupLevel.FREE); } @@ -456,15 +456,15 @@ public class BackupManagerTest { backupKey, aci); final ECKeyPair keyPair = ECKeyPair.generate(); final byte[] signature = keyPair.getPrivateKey().calculateSignature(oldCredential.serialize()); - backupManager.setPublicKey(oldCredential, signature, keyPair.getPublicKey()).join(); + backupManager.setPublicKey(oldCredential, signature, keyPair.getPublicKey()); // should be accepted the day before to forgive clock skew testClock.pin(Instant.ofEpochSecond(1)); - assertThatNoException().isThrownBy(() -> backupManager.authenticateBackupUser(oldCredential, signature, null).join()); + assertThatNoException().isThrownBy(() -> backupManager.authenticateBackupUser(oldCredential, signature, null)); // should be accepted the day after to forgive clock skew testClock.pin(Instant.ofEpochSecond(1).plus(Duration.ofDays(2))); - assertThatNoException().isThrownBy(() -> backupManager.authenticateBackupUser(oldCredential, signature, null).join()); + assertThatNoException().isThrownBy(() -> backupManager.authenticateBackupUser(oldCredential, signature, null)); // should be rejected the day after that testClock.pin(Instant.ofEpochSecond(1).plus(Duration.ofDays(3))); @@ -713,8 +713,7 @@ public class BackupManagerTest { Optional.of("newCursor") ))); - final BackupManager.ListMediaResult result = backupManager.list(backupUser, cursor, 17) - .toCompletableFuture().join(); + final BackupManager.ListMediaResult result = backupManager.list(backupUser, cursor, 17); assertThat(result.media()).hasSize(1); assertThat(result.media().getFirst().cdn()).isEqualTo(13); assertThat(result.media().getFirst().key()).isEqualTo( @@ -733,7 +732,7 @@ public class BackupManagerTest { when(svrbClient.removeData(anyString())).thenReturn(CompletableFuture.completedFuture(null)); // Deleting should swap the backupDir for the user - backupManager.deleteEntireBackup(original).join(); + backupManager.deleteEntireBackup(original); verifyNoInteractions(remoteStorageManager); verify(svrbClient).removeData(HexFormat.of().formatHex(BackupsDb.hashedBackupId(original.backupId()))); @@ -747,7 +746,7 @@ public class BackupManagerTest { Collections.emptyList(), Optional.empty() ))); - backupManager.deleteEntireBackup(after).join(); + backupManager.deleteEntireBackup(after); verify(remoteStorageManager, times(1)) .list(eq(after.backupDir() + "/"), eq(Optional.empty()), anyLong()); @@ -914,7 +913,7 @@ public class BackupManagerTest { .toList(); for (int i = 0; i < backupUsers.size(); i++) { testClock.pin(days(i)); - backupManager.createMessageBackupUploadDescriptor(backupUsers.get(i)).join(); + backupManager.createMessageBackupUploadDescriptor(backupUsers.get(i)); } // set of backup-id hashes that should be expired (initially t=0) @@ -949,11 +948,11 @@ public class BackupManagerTest { // refreshed media timestamp at t=5 testClock.pin(days(5)); - backupManager.createMessageBackupUploadDescriptor(backupUser(backupId, BackupCredentialType.MESSAGES, BackupLevel.PAID)).join(); + backupManager.createMessageBackupUploadDescriptor(backupUser(backupId, BackupCredentialType.MESSAGES, BackupLevel.PAID)); // refreshed messages timestamp at t=6 testClock.pin(days(6)); - backupManager.createMessageBackupUploadDescriptor(backupUser(backupId, BackupCredentialType.MESSAGES, BackupLevel.FREE)).join(); + backupManager.createMessageBackupUploadDescriptor(backupUser(backupId, BackupCredentialType.MESSAGES, BackupLevel.FREE)); Function> getExpired = time -> backupManager .getExpiredBackups(1, Schedulers.immediate(), time) @@ -974,7 +973,7 @@ public class BackupManagerTest { @EnumSource(mode = EnumSource.Mode.INCLUDE, names = {"MEDIA", "ALL"}) public void expireBackup(ExpiredBackup.ExpirationType expirationType) { final AuthenticatedBackupUser backupUser = backupUser(TestRandomUtil.nextBytes(16), BackupCredentialType.MESSAGES, BackupLevel.PAID); - backupManager.createMessageBackupUploadDescriptor(backupUser).join(); + backupManager.createMessageBackupUploadDescriptor(backupUser); final String expectedPrefixToDelete = switch (expirationType) { case ALL -> backupUser.backupDir(); @@ -1020,7 +1019,7 @@ public class BackupManagerTest { @Test public void deleteBackupPaginated() { final AuthenticatedBackupUser backupUser = backupUser(TestRandomUtil.nextBytes(16), BackupCredentialType.MESSAGES, BackupLevel.PAID); - backupManager.createMessageBackupUploadDescriptor(backupUser).join(); + backupManager.createMessageBackupUploadDescriptor(backupUser); final ExpiredBackup expiredBackup = expiredBackup(ExpiredBackup.ExpirationType.MEDIA, backupUser); final String mediaPrefix = expiredBackup.prefixToDelete() + "/"; diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/controllers/ArchiveControllerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/controllers/ArchiveControllerTest.java index c42e378be..f0eb79a2d 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/controllers/ArchiveControllerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/controllers/ArchiveControllerTest.java @@ -8,6 +8,7 @@ package org.whispersystems.textsecuregcm.controllers; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; @@ -111,8 +112,8 @@ public class ArchiveControllerTest { reset(backupAuthManager); reset(backupManager); - when(accountsManager.getByAccountIdentifierAsync(AuthHelper.VALID_UUID)) - .thenReturn(CompletableFuture.completedFuture(Optional.of(AuthHelper.VALID_ACCOUNT))); + when(accountsManager.getByAccountIdentifier(AuthHelper.VALID_UUID)) + .thenReturn(Optional.of(AuthHelper.VALID_ACCOUNT)); } @ParameterizedTest @@ -164,9 +165,7 @@ public class ArchiveControllerTest { } @Test - public void setBackupId() { - when(backupAuthManager.commitBackupId(any(), any(), any(), any())).thenReturn(CompletableFuture.completedFuture(null)); - + public void setBackupId() throws RateLimitExceededException { final Response response = resources.getJerseyTest() .target("v1/archives/backupid") .request() @@ -184,9 +183,7 @@ public class ArchiveControllerTest { } @Test - public void setBackupIdPartial() { - when(backupAuthManager.commitBackupId(any(), any(), any(), any())).thenReturn(CompletableFuture.completedFuture(null)); - + public void setBackupIdPartial() throws RateLimitExceededException { final Response response = resources.getJerseyTest() .target("v1/archives/backupid") .request() @@ -210,8 +207,7 @@ public class ArchiveControllerTest { }) public void backupIdLimits(boolean hasPermits, long waitSeconds) { when(backupAuthManager.checkBackupIdRotationLimit(any())) - .thenReturn(CompletableFuture.completedFuture( - new BackupAuthManager.BackupIdRotationLimit(hasPermits, Duration.ofSeconds(waitSeconds)))); + .thenReturn(new BackupAuthManager.BackupIdRotationLimit(hasPermits, Duration.ofSeconds(waitSeconds))); final ArchiveController.BackupIdLimitResponse response = resources.getJerseyTest() .target("v1/archives/backupid/limits") @@ -233,7 +229,6 @@ public class ArchiveControllerTest { final ReceiptCredentialResponse rcr = serverOps.issueReceiptCredential(rcrc.getRequest(), 0L, 3L); final ReceiptCredential receiptCredential = clientOps.receiveReceiptCredential(rcrc, rcr); final ReceiptCredentialPresentation presentation = clientOps.createReceiptCredentialPresentation(receiptCredential); - when(backupAuthManager.redeemReceipt(any(), any())).thenReturn(CompletableFuture.completedFuture(null)); final Response response = resources.getJerseyTest() .target("v1/archives/redeem-receipt") @@ -248,8 +243,6 @@ public class ArchiveControllerTest { @Test public void setBadPublicKey() throws VerificationFailedException { - when(backupManager.setPublicKey(any(), any(), any())).thenReturn(CompletableFuture.completedFuture(null)); - final BackupAuthCredentialPresentation presentation = backupAuthTestUtil.getPresentation( BackupLevel.PAID, messagesBackupKey, aci); final Response response = resources.getJerseyTest() @@ -265,8 +258,6 @@ public class ArchiveControllerTest { @Test public void setMissingPublicKey() throws VerificationFailedException { - when(backupManager.setPublicKey(any(), any(), any())).thenReturn(CompletableFuture.completedFuture(null)); - final BackupAuthCredentialPresentation presentation = backupAuthTestUtil.getPresentation( BackupLevel.PAID, messagesBackupKey, aci); final Response response = resources.getJerseyTest() @@ -280,8 +271,6 @@ public class ArchiveControllerTest { @Test public void setPublicKey() throws VerificationFailedException { - when(backupManager.setPublicKey(any(), any(), any())).thenReturn(CompletableFuture.completedFuture(null)); - final BackupAuthCredentialPresentation presentation = backupAuthTestUtil.getPresentation( BackupLevel.PAID, messagesBackupKey, aci); final Response response = resources.getJerseyTest() @@ -312,20 +301,15 @@ public class ArchiveControllerTest { public static Stream setBackupIdException() { return Stream.of( - Arguments.of(new RateLimitExceededException(null), false, 429), - Arguments.of(Status.INVALID_ARGUMENT.withDescription("async").asRuntimeException(), false, 400), - Arguments.of(Status.INVALID_ARGUMENT.withDescription("sync").asRuntimeException(), true, 400) + Arguments.of(new RateLimitExceededException(null), 429), + Arguments.of(Status.INVALID_ARGUMENT.withDescription("test").asRuntimeException(), 400) ); } @ParameterizedTest @MethodSource - public void setBackupIdException(final Exception ex, final boolean sync, final int expectedStatus) { - if (sync) { - when(backupAuthManager.commitBackupId(any(), any(), any(), any())).thenThrow(ex); - } else { - when(backupAuthManager.commitBackupId(any(), any(), any(), any())).thenReturn(CompletableFuture.failedFuture(ex)); - } + public void setBackupIdException(final Exception ex, final int expectedStatus) throws RateLimitExceededException { + doThrow(ex).when(backupAuthManager).commitBackupId(any(), any(), any(), any()); final Response response = resources.getJerseyTest() .target("v1/archives/backupid") .request() @@ -349,7 +333,7 @@ public class ArchiveControllerTest { expectedCredentialsByType.forEach((credentialType, expectedCredentials) -> when(backupAuthManager.getBackupAuthCredentials(any(), eq(credentialType), eq(expectedRange))) - .thenReturn(CompletableFuture.completedFuture(expectedCredentials))); + .thenReturn(expectedCredentials)); final ArchiveController.BackupAuthCredentialsResponse credentialResponse = resources.getJerseyTest() .target("v1/archives/auth") @@ -405,9 +389,9 @@ public class ArchiveControllerTest { final BackupAuthCredentialPresentation presentation = backupAuthTestUtil.getPresentation( BackupLevel.PAID, messagesBackupKey, aci); when(backupManager.authenticateBackupUser(any(), any(), any())) - .thenReturn(CompletableFuture.completedFuture(backupUser(presentation.getBackupId(), BackupCredentialType.MESSAGES, BackupLevel.PAID))); - when(backupManager.backupInfo(any())).thenReturn(CompletableFuture.completedFuture(new BackupManager.BackupInfo( - 1, "myBackupDir", "myMediaDir", "filename", Optional.empty()))); + .thenReturn(backupUser(presentation.getBackupId(), BackupCredentialType.MESSAGES, BackupLevel.PAID)); + when(backupManager.backupInfo(any())) + .thenReturn(new BackupManager.BackupInfo(1, "myBackupDir", "myMediaDir", "filename", Optional.empty())); final ArchiveController.BackupInfoResponse response = resources.getJerseyTest() .target("v1/archives") .request() @@ -425,7 +409,7 @@ public class ArchiveControllerTest { final BackupAuthCredentialPresentation presentation = backupAuthTestUtil.getPresentation( BackupLevel.PAID, messagesBackupKey, aci); when(backupManager.authenticateBackupUser(any(), any(), any())) - .thenReturn(CompletableFuture.completedFuture(backupUser(presentation.getBackupId(), BackupCredentialType.MESSAGES, BackupLevel.PAID))); + .thenReturn(backupUser(presentation.getBackupId(), BackupCredentialType.MESSAGES, BackupLevel.PAID)); final byte[][] mediaIds = new byte[][]{TestRandomUtil.nextBytes(15), TestRandomUtil.nextBytes(15)}; when(backupManager.copyToBackup(any(), any())) .thenReturn(Flux.just( @@ -470,7 +454,7 @@ public class ArchiveControllerTest { final BackupAuthCredentialPresentation presentation = backupAuthTestUtil.getPresentation( BackupLevel.PAID, messagesBackupKey, aci); when(backupManager.authenticateBackupUser(any(), any(), any())) - .thenReturn(CompletableFuture.completedFuture(backupUser(presentation.getBackupId(), BackupCredentialType.MESSAGES, BackupLevel.PAID))); + .thenReturn(backupUser(presentation.getBackupId(), BackupCredentialType.MESSAGES, BackupLevel.PAID)); final byte[][] mediaIds = IntStream.range(0, 4).mapToObj(i -> TestRandomUtil.nextBytes(15)).toArray(byte[][]::new); when(backupManager.copyToBackup(any(), any())) @@ -528,7 +512,7 @@ public class ArchiveControllerTest { final BackupAuthCredentialPresentation presentation = backupAuthTestUtil.getPresentation( BackupLevel.PAID, messagesBackupKey, aci); when(backupManager.authenticateBackupUser(any(), any(), any())) - .thenReturn(CompletableFuture.completedFuture(backupUser(presentation.getBackupId(), BackupCredentialType.MESSAGES, BackupLevel.PAID))); + .thenReturn(backupUser(presentation.getBackupId(), BackupCredentialType.MESSAGES, BackupLevel.PAID)); final byte[][] mediaIds = new byte[][]{TestRandomUtil.nextBytes(15), TestRandomUtil.nextBytes(15)}; final Response r = resources.getJerseyTest() .target("v1/archives/media/batch") @@ -561,17 +545,17 @@ public class ArchiveControllerTest { final BackupAuthCredentialPresentation presentation = backupAuthTestUtil.getPresentation( BackupLevel.PAID, messagesBackupKey, aci); when(backupManager.authenticateBackupUser(any(), any(), any())) - .thenReturn(CompletableFuture.completedFuture(backupUser(presentation.getBackupId(), BackupCredentialType.MESSAGES, BackupLevel.PAID))); + .thenReturn(backupUser(presentation.getBackupId(), BackupCredentialType.MESSAGES, BackupLevel.PAID)); final byte[] mediaId = TestRandomUtil.nextBytes(15); final Optional expectedCursor = cursorProvided ? Optional.of("myCursor") : Optional.empty(); final Optional returnedCursor = cursorReturned ? Optional.of("newCursor") : Optional.empty(); when(backupManager.list(any(), eq(expectedCursor), eq(17))) - .thenReturn(CompletableFuture.completedFuture(new BackupManager.ListMediaResult( + .thenReturn(new BackupManager.ListMediaResult( List.of(new BackupManager.StorageDescriptorWithLength(1, mediaId, 100)), returnedCursor - ))); + )); WebTarget target = resources.getJerseyTest() .target("v1/archives/media/") @@ -596,7 +580,7 @@ public class ArchiveControllerTest { final BackupAuthCredentialPresentation presentation = backupAuthTestUtil.getPresentation(BackupLevel.PAID, messagesBackupKey, aci); when(backupManager.authenticateBackupUser(any(), any(), any())) - .thenReturn(CompletableFuture.completedFuture(backupUser(presentation.getBackupId(), BackupCredentialType.MESSAGES, BackupLevel.PAID))); + .thenReturn(backupUser(presentation.getBackupId(), BackupCredentialType.MESSAGES, BackupLevel.PAID)); final ArchiveController.DeleteMedia deleteRequest = new ArchiveController.DeleteMedia( IntStream @@ -632,10 +616,9 @@ public class ArchiveControllerTest { final BackupAuthCredentialPresentation presentation = backupAuthTestUtil.getPresentation(BackupLevel.PAID, messagesBackupKey, aci); when(backupManager.authenticateBackupUser(any(), any(), any())) - .thenReturn(CompletableFuture.completedFuture(backupUser(presentation.getBackupId(), BackupCredentialType.MESSAGES, BackupLevel.PAID))); + .thenReturn(backupUser(presentation.getBackupId(), BackupCredentialType.MESSAGES, BackupLevel.PAID)); when(backupManager.createMessageBackupUploadDescriptor(any())) - .thenReturn(CompletableFuture.completedFuture( - new BackupUploadDescriptor(3, "abc", Map.of("k", "v"), "example.org"))); + .thenReturn(new BackupUploadDescriptor(3, "abc", Map.of("k", "v"), "example.org")); final WebTarget builder = resources.getJerseyTest().target("v1/archives/upload/form"); final Response response = uploadLength @@ -658,14 +641,13 @@ public class ArchiveControllerTest { } @Test - public void mediaUploadForm() throws VerificationFailedException { + public void mediaUploadForm() throws VerificationFailedException, RateLimitExceededException { final BackupAuthCredentialPresentation presentation = backupAuthTestUtil.getPresentation(BackupLevel.PAID, messagesBackupKey, aci); when(backupManager.authenticateBackupUser(any(), any(), any())) - .thenReturn(CompletableFuture.completedFuture(backupUser(presentation.getBackupId(), BackupCredentialType.MESSAGES, BackupLevel.PAID))); + .thenReturn(backupUser(presentation.getBackupId(), BackupCredentialType.MESSAGES, BackupLevel.PAID)); when(backupManager.createTemporaryAttachmentUploadDescriptor(any())) - .thenReturn(CompletableFuture.completedFuture( - new BackupUploadDescriptor(3, "abc", Map.of("k", "v"), "example.org"))); + .thenReturn(new BackupUploadDescriptor(3, "abc", Map.of("k", "v"), "example.org")); final ArchiveController.UploadDescriptorResponse desc = resources.getJerseyTest() .target("v1/archives/media/upload/form") .request() @@ -678,8 +660,7 @@ public class ArchiveControllerTest { assertThat(desc.signedUploadLocation()).isEqualTo("example.org"); // rate limit - when(backupManager.createTemporaryAttachmentUploadDescriptor(any())) - .thenReturn(CompletableFuture.failedFuture(new RateLimitExceededException(null))); + when(backupManager.createTemporaryAttachmentUploadDescriptor(any())).thenThrow(new RateLimitExceededException(null)); final Response response = resources.getJerseyTest() .target("v1/archives/media/upload/form") .request() @@ -694,7 +675,7 @@ public class ArchiveControllerTest { final BackupAuthCredentialPresentation presentation = backupAuthTestUtil.getPresentation(BackupLevel.PAID, messagesBackupKey, aci); when(backupManager.authenticateBackupUser(any(), any(), any())) - .thenReturn(CompletableFuture.completedFuture(backupUser(presentation.getBackupId(), BackupCredentialType.MESSAGES, BackupLevel.PAID))); + .thenReturn(backupUser(presentation.getBackupId(), BackupCredentialType.MESSAGES, BackupLevel.PAID)); when(backupManager.generateReadAuth(any(), eq(3))).thenReturn(Map.of("key", "value")); final ArchiveController.ReadAuthResponse response = resources.getJerseyTest() .target("v1/archives/auth/read") @@ -712,7 +693,7 @@ public class ArchiveControllerTest { final BackupAuthCredentialPresentation presentation = backupAuthTestUtil.getPresentation(BackupLevel.PAID, messagesBackupKey, aci); when(backupManager.authenticateBackupUser(any(), any(), any())) - .thenReturn(CompletableFuture.completedFuture(backupUser(presentation.getBackupId(), BackupCredentialType.MESSAGES, BackupLevel.PAID))); + .thenReturn(backupUser(presentation.getBackupId(), BackupCredentialType.MESSAGES, BackupLevel.PAID)); final ExternalServiceCredentials credentials = new ExternalServiceCredentials("username", "password"); when(backupManager.generateSvrbAuth(any())).thenReturn(credentials); final ExternalServiceCredentials response = resources.getJerseyTest() @@ -751,8 +732,7 @@ public class ArchiveControllerTest { final BackupAuthCredentialPresentation presentation = backupAuthTestUtil.getPresentation(BackupLevel.PAID, messagesBackupKey, aci); when(backupManager.authenticateBackupUser(any(), any(), any())) - .thenReturn(CompletableFuture.completedFuture(backupUser(presentation.getBackupId(), BackupCredentialType.MESSAGES, BackupLevel.PAID))); - when(backupManager.deleteEntireBackup(any())).thenReturn(CompletableFuture.completedFuture(null)); + .thenReturn(backupUser(presentation.getBackupId(), BackupCredentialType.MESSAGES, BackupLevel.PAID)); Response response = resources.getJerseyTest() .target("v1/archives/") .request() @@ -767,7 +747,7 @@ public class ArchiveControllerTest { final BackupAuthCredentialPresentation presentation = backupAuthTestUtil.getPresentation( BackupLevel.PAID, messagesBackupKey, aci); when(backupManager.authenticateBackupUser(any(), any(), any())) - .thenReturn(CompletableFuture.completedFuture(backupUser(presentation.getBackupId(), BackupCredentialType.MESSAGES, BackupLevel.PAID))); + .thenReturn(backupUser(presentation.getBackupId(), BackupCredentialType.MESSAGES, BackupLevel.PAID)); final Response r = resources.getJerseyTest() .target("v1/archives/media") .request() diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/controllers/DeviceCheckControllerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/controllers/DeviceCheckControllerTest.java index 18f300f69..cc8d5909b 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/controllers/DeviceCheckControllerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/controllers/DeviceCheckControllerTest.java @@ -29,7 +29,6 @@ import java.util.Base64; import java.util.Map; import java.util.Optional; import java.util.UUID; -import java.util.concurrent.CompletableFuture; import org.glassfish.jersey.server.ServerProperties; import org.glassfish.jersey.test.grizzly.GrizzlyWebTestContainerFactory; import org.junit.jupiter.api.BeforeEach; @@ -45,7 +44,6 @@ import org.whispersystems.textsecuregcm.limits.RateLimiters; import org.whispersystems.textsecuregcm.mappers.CompletionExceptionMapper; import org.whispersystems.textsecuregcm.mappers.GrpcStatusRuntimeExceptionMapper; import org.whispersystems.textsecuregcm.mappers.RateLimitExceededExceptionMapper; -import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.devicecheck.AppleDeviceCheckManager; import org.whispersystems.textsecuregcm.storage.devicecheck.ChallengeNotFoundException; @@ -206,11 +204,6 @@ class DeviceCheckControllerTest { {"action": "backup", "challenge": "embeddedChallenge"} """; - when(backupAuthManager.extendBackupVoucher(any(), eq(new Account.BackupVoucher( - REDEMPTION_LEVEL, - clock.instant().plus(REDEMPTION_DURATION))))) - .thenReturn(CompletableFuture.completedFuture(null)); - final Response response = resources.getJerseyTest() .target("v1/devicecheck/assert") .queryParam("keyId", Base64.getUrlEncoder().encodeToString(keyId)) diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/grpc/BackupsAnonymousGrpcServiceTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/grpc/BackupsAnonymousGrpcServiceTest.java index 549c36d3c..519d07e16 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/grpc/BackupsAnonymousGrpcServiceTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/grpc/BackupsAnonymousGrpcServiceTest.java @@ -90,14 +90,13 @@ class BackupsAnonymousGrpcServiceTest extends @BeforeEach void setup() { - when(backupManager.authenticateBackupUser(any(), any(), any())) + when(backupManager.authenticateBackupUserAsync(any(), any(), any())) .thenReturn(CompletableFuture.completedFuture( backupUser(presentation.getBackupId(), BackupCredentialType.MESSAGES, BackupLevel.PAID))); } @Test void setPublicKey() { - when(backupManager.setPublicKey(any(), any(), any())).thenReturn(CompletableFuture.completedFuture(null)); assertThatNoException().isThrownBy(() -> unauthenticatedServiceStub().setPublicKey(SetPublicKeyRequest.newBuilder() .setPublicKey(ByteString.copyFrom(ECKeyPair.generate().getPublicKey().serialize())) .setSignedPresentation(signedPresentation(presentation)) @@ -106,7 +105,6 @@ class BackupsAnonymousGrpcServiceTest extends @Test void setBadPublicKey() { - when(backupManager.setPublicKey(any(), any(), any())).thenReturn(CompletableFuture.completedFuture(null)); assertThatExceptionOfType(StatusRuntimeException.class).isThrownBy(() -> unauthenticatedServiceStub().setPublicKey(SetPublicKeyRequest.newBuilder() .setPublicKey(ByteString.copyFromUtf8("aaaaa")) // Invalid public key @@ -214,8 +212,8 @@ class BackupsAnonymousGrpcServiceTest extends @Test void getBackupInfo() { - when(backupManager.backupInfo(any())).thenReturn(CompletableFuture.completedFuture(new BackupManager.BackupInfo( - 1, "myBackupDir", "myMediaDir", "filename", Optional.empty()))); + when(backupManager.backupInfo(any())) + .thenReturn(new BackupManager.BackupInfo(1, "myBackupDir", "myMediaDir", "filename", Optional.empty())); final GetBackupInfoResponse response = unauthenticatedServiceStub().getBackupInfo(GetBackupInfoRequest.newBuilder() .setSignedPresentation(signedPresentation(presentation)) @@ -240,9 +238,9 @@ class BackupsAnonymousGrpcServiceTest extends final int limit = 17; when(backupManager.list(any(), eq(expectedCursor), eq(limit))) - .thenReturn(CompletableFuture.completedFuture(new BackupManager.ListMediaResult( + .thenReturn(new BackupManager.ListMediaResult( List.of(new BackupManager.StorageDescriptorWithLength(1, mediaId, 100)), - returnedCursor))); + returnedCursor)); final ListMediaRequest.Builder request = ListMediaRequest.newBuilder() .setSignedPresentation(signedPresentation(presentation)) @@ -280,10 +278,9 @@ class BackupsAnonymousGrpcServiceTest extends } @Test - void mediaUploadForm() { + void mediaUploadForm() throws RateLimitExceededException { when(backupManager.createTemporaryAttachmentUploadDescriptor(any())) - .thenReturn(CompletableFuture.completedFuture( - new BackupUploadDescriptor(3, "abc", Map.of("k", "v"), "example.org"))); + .thenReturn(new BackupUploadDescriptor(3, "abc", Map.of("k", "v"), "example.org")); final GetUploadFormRequest request = GetUploadFormRequest.newBuilder() .setMedia(GetUploadFormRequest.MediaUploadType.getDefaultInstance()) .setSignedPresentation(signedPresentation(presentation)) @@ -298,7 +295,7 @@ class BackupsAnonymousGrpcServiceTest extends // rate limit Duration duration = Duration.ofSeconds(10); when(backupManager.createTemporaryAttachmentUploadDescriptor(any())) - .thenReturn(CompletableFuture.failedFuture(new RateLimitExceededException(duration))); + .thenThrow(new RateLimitExceededException(duration)); GrpcTestUtils.assertRateLimitExceeded(duration, () -> unauthenticatedServiceStub().getUploadForm(request)); } @@ -314,8 +311,7 @@ class BackupsAnonymousGrpcServiceTest extends @MethodSource public void messagesUploadForm(Optional uploadLength, boolean expectSuccess) { when(backupManager.createMessageBackupUploadDescriptor(any())) - .thenReturn(CompletableFuture.completedFuture( - new BackupUploadDescriptor(3, "abc", Map.of("k", "v"), "example.org"))); + .thenReturn(new BackupUploadDescriptor(3, "abc", Map.of("k", "v"), "example.org")); final GetUploadFormRequest.MessagesUploadType.Builder builder = GetUploadFormRequest.MessagesUploadType.newBuilder(); uploadLength.ifPresent(builder::setUploadLength); final GetUploadFormRequest request = GetUploadFormRequest.newBuilder() diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/grpc/BackupsGrpcServiceTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/grpc/BackupsGrpcServiceTest.java index 0ad64ebd1..bbe8e2542 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/grpc/BackupsGrpcServiceTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/grpc/BackupsGrpcServiceTest.java @@ -8,6 +8,7 @@ package org.whispersystems.textsecuregcm.grpc; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -90,15 +91,14 @@ class BackupsGrpcServiceTest extends SimpleBaseGrpcTest setBackupIdException() { return Stream.of( - Arguments.of(new RateLimitExceededException(null), false, Status.RESOURCE_EXHAUSTED), - Arguments.of(Status.INVALID_ARGUMENT.withDescription("async").asRuntimeException(), false, - Status.INVALID_ARGUMENT), - Arguments.of(Status.INVALID_ARGUMENT.withDescription("sync").asRuntimeException(), true, + Arguments.of(new RateLimitExceededException(null), Status.RESOURCE_EXHAUSTED), + Arguments.of(Status.INVALID_ARGUMENT.withDescription("test").asRuntimeException(), Status.INVALID_ARGUMENT) ); } @ParameterizedTest @MethodSource - void setBackupIdException(final Exception ex, final boolean sync, final Status expected) { - if (sync) { - when(backupAuthManager.commitBackupId(any(), any(), any(), any())).thenThrow(ex); - } else { - when(backupAuthManager.commitBackupId(any(), any(), any(), any())) - .thenReturn(CompletableFuture.failedFuture(ex)); - } + void setBackupIdException(final Exception ex, final Status expected) + throws RateLimitExceededException { + doThrow(ex).when(backupAuthManager).commitBackupId(any(), any(), any(), any()); GrpcTestUtils.assertStatusException( expected, () -> authenticatedServiceStub().setBackupId(SetBackupIdRequest.newBuilder() @@ -180,8 +171,6 @@ class BackupsGrpcServiceTest extends SimpleBaseGrpcTest when(backupAuthManager.getBackupAuthCredentials(any(), eq(credentialType), eq(expectedRange))) - .thenReturn(CompletableFuture.completedFuture(expectedCredentials))); + .thenReturn(expectedCredentials)); final GetBackupAuthCredentialsResponse credentialResponse = authenticatedServiceStub().getBackupAuthCredentials( GetBackupAuthCredentialsRequest.newBuilder() diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/grpc/ErrorMappingInterceptorTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/grpc/ErrorMappingInterceptorTest.java index a97525df4..af614b931 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/grpc/ErrorMappingInterceptorTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/grpc/ErrorMappingInterceptorTest.java @@ -15,6 +15,7 @@ import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.protobuf.StatusProto; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -120,6 +121,20 @@ class ErrorMappingInterceptorTest { client.echo(EchoRequest.getDefaultInstance())); } + @Test + public void mapWrappedIOExceptionsSimple() throws Exception { + server = InProcessServerBuilder.forName("ErrorMappingInterceptorTest") + .directExecutor() + .addService(new SimpleEchoServiceErrorImpl(new CompletionException(new UncheckedIOException(new IOException("test"))))) + .intercept(new ErrorMappingInterceptor()) + .build() + .start(); + + final EchoServiceGrpc.EchoServiceBlockingStub client = EchoServiceGrpc.newBlockingStub(channel); + GrpcTestUtils.assertStatusException(Status.UNAVAILABLE, "UNAVAILABLE", () -> + client.echo(EchoRequest.getDefaultInstance())); + } + static class ReactorEchoServiceErrorImpl extends ReactorEchoServiceGrpc.EchoServiceImplBase {