mirror of
https://github.com/signalapp/Signal-Server
synced 2026-04-19 20:38:04 +01:00
Make Backup methods synchronous
This commit is contained in:
@@ -40,7 +40,6 @@ import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
import org.whispersystems.textsecuregcm.storage.RedeemedReceiptsManager;
|
||||
import org.whispersystems.textsecuregcm.util.Util;
|
||||
|
||||
/**
|
||||
* Issues ZK backup auth credentials for authenticated accounts
|
||||
@@ -57,7 +56,6 @@ public class BackupAuthManager {
|
||||
private static final Logger logger = LoggerFactory.getLogger(BackupAuthManager.class);
|
||||
|
||||
|
||||
final static Duration MAX_REDEMPTION_DURATION = Duration.ofDays(7);
|
||||
final static String BACKUP_MEDIA_EXPERIMENT_NAME = "backupMedia";
|
||||
|
||||
private final ExperimentEnrollmentManager experimentEnrollmentManager;
|
||||
@@ -94,14 +92,13 @@ public class BackupAuthManager {
|
||||
* message backups
|
||||
* @param mediaBackupCredentialRequest A request containing the blinded backup-id the client will use to upload
|
||||
* media backups
|
||||
* @return A future that completes when the credentialRequest has been stored
|
||||
* @throws RateLimitExceededException If too many backup-ids have been committed
|
||||
*/
|
||||
public CompletableFuture<Void> commitBackupId(
|
||||
public void commitBackupId(
|
||||
final Account account,
|
||||
final Device device,
|
||||
final Optional<BackupAuthCredentialRequest> messagesBackupCredentialRequest,
|
||||
final Optional<BackupAuthCredentialRequest> mediaBackupCredentialRequest) {
|
||||
final Optional<BackupAuthCredentialRequest> mediaBackupCredentialRequest) throws RateLimitExceededException {
|
||||
if (!device.isPrimary()) {
|
||||
throw Status.PERMISSION_DENIED.withDescription("Only primary device can set backup-id").asRuntimeException();
|
||||
}
|
||||
@@ -133,33 +130,24 @@ public class BackupAuthManager {
|
||||
if (!requiresMessageRotation && !requiresMediaRotation) {
|
||||
// No need to update or enforce rate limits, this is the credential that the user has already
|
||||
// committed to.
|
||||
return CompletableFuture.completedFuture(null);
|
||||
return;
|
||||
}
|
||||
|
||||
CompletableFuture<Void> rateLimitFuture = CompletableFuture.completedFuture(null);
|
||||
|
||||
if (requiresMessageRotation) {
|
||||
rateLimitFuture = rateLimitFuture.thenCombine(
|
||||
rateLimiters.forDescriptor(RateLimiters.For.SET_BACKUP_ID).validateAsync(account.getUuid()),
|
||||
(_, _) -> null);
|
||||
rateLimiters.forDescriptor(RateLimiters.For.SET_BACKUP_ID).validate(account.getUuid());
|
||||
}
|
||||
|
||||
if (requiresMediaRotation && hasActiveVoucher(account)) {
|
||||
rateLimitFuture = rateLimitFuture.thenCombine(
|
||||
rateLimiters.forDescriptor(RateLimiters.For.SET_PAID_MEDIA_BACKUP_ID).validateAsync(account.getUuid()),
|
||||
(_, _) -> null);
|
||||
rateLimiters.forDescriptor(RateLimiters.For.SET_PAID_MEDIA_BACKUP_ID).validate(account.getUuid());
|
||||
}
|
||||
|
||||
return rateLimitFuture.thenCompose(ignored -> this.accountsManager
|
||||
.updateAsync(account, a ->
|
||||
a.setBackupCredentialRequests(targetMessageCredentialRequest, targetMediaCredentialRequest))
|
||||
.thenRun(Util.NOOP))
|
||||
.toCompletableFuture();
|
||||
this.accountsManager.update(account, a ->
|
||||
a.setBackupCredentialRequests(targetMessageCredentialRequest, targetMediaCredentialRequest));
|
||||
}
|
||||
|
||||
public record BackupIdRotationLimit(boolean hasPermitsRemaining, Duration nextPermitAvailable) {}
|
||||
|
||||
public CompletionStage<BackupIdRotationLimit> checkBackupIdRotationLimit(final Account account) {
|
||||
public BackupIdRotationLimit checkBackupIdRotationLimit(final Account account) {
|
||||
final RateLimiter messagesLimiter = rateLimiters.forDescriptor(RateLimiters.For.SET_BACKUP_ID);
|
||||
final RateLimiter mediaLimiter = rateLimiters.forDescriptor(RateLimiters.For.SET_PAID_MEDIA_BACKUP_ID);
|
||||
|
||||
@@ -180,7 +168,7 @@ public class BackupAuthManager {
|
||||
isPaid ? mediaLimiter.config().permitRegenerationDuration() : Duration.ZERO));
|
||||
return new BackupIdRotationLimit(false, timeToNextPermit);
|
||||
}
|
||||
});
|
||||
}).toCompletableFuture().join();
|
||||
}
|
||||
|
||||
public record Credential(BackupAuthCredentialResponse credential, Instant redemptionTime) {}
|
||||
@@ -200,19 +188,20 @@ public class BackupAuthManager {
|
||||
* @param redemptionRange The time range to return credentials for
|
||||
* @return Credentials and the day on which they may be redeemed
|
||||
*/
|
||||
public CompletableFuture<List<Credential>> getBackupAuthCredentials(
|
||||
public List<Credential> getBackupAuthCredentials(
|
||||
final Account account,
|
||||
final BackupCredentialType credentialType,
|
||||
final RedemptionRange redemptionRange) {
|
||||
|
||||
// If the account has an expired payment, clear it before continuing
|
||||
if (hasExpiredVoucher(account)) {
|
||||
return accountsManager.updateAsync(account, a -> {
|
||||
final Account updated = accountsManager.update(account, a -> {
|
||||
// Re-check in case we raced with an update
|
||||
if (hasExpiredVoucher(a)) {
|
||||
a.setBackupVoucher(null);
|
||||
}
|
||||
}).thenCompose(updated -> getBackupAuthCredentials(updated, credentialType, redemptionRange));
|
||||
});
|
||||
return getBackupAuthCredentials(updated, credentialType, redemptionRange);
|
||||
}
|
||||
|
||||
// fetch the blinded backup-id the account should have previously committed to
|
||||
@@ -224,7 +213,7 @@ public class BackupAuthManager {
|
||||
|
||||
// create a credential for every day in the requested period
|
||||
final BackupAuthCredentialRequest credentialReq = new BackupAuthCredentialRequest(committedBytes);
|
||||
return CompletableFuture.completedFuture(StreamSupport.stream(redemptionRange.spliterator(), false)
|
||||
return StreamSupport.stream(redemptionRange.spliterator(), false)
|
||||
.map(redemptionTime -> {
|
||||
// Check if the account has a voucher that's good for a certain receiptLevel at redemption time, otherwise
|
||||
// use the default receipt level
|
||||
@@ -233,7 +222,7 @@ public class BackupAuthManager {
|
||||
credentialReq.issueCredential(redemptionTime, backupLevel, credentialType, serverSecretParams),
|
||||
redemptionTime);
|
||||
})
|
||||
.toList());
|
||||
.toList();
|
||||
} catch (InvalidInputException e) {
|
||||
throw Status.INTERNAL
|
||||
.withDescription("Could not deserialize stored request credential")
|
||||
@@ -247,9 +236,8 @@ public class BackupAuthManager {
|
||||
*
|
||||
* @param account The account to enable backups on
|
||||
* @param receiptCredentialPresentation A ZK receipt presentation proving payment
|
||||
* @return A future that completes successfully when the account has been updated
|
||||
*/
|
||||
public CompletableFuture<Void> redeemReceipt(
|
||||
public void redeemReceipt(
|
||||
final Account account,
|
||||
final ReceiptCredentialPresentation receiptCredentialPresentation) {
|
||||
try {
|
||||
@@ -279,16 +267,15 @@ public class BackupAuthManager {
|
||||
.asRuntimeException();
|
||||
}
|
||||
|
||||
return redeemedReceiptsManager
|
||||
boolean receiptAllowed = redeemedReceiptsManager
|
||||
.put(receiptSerial, receiptExpiration.getEpochSecond(), receiptLevel, account.getUuid())
|
||||
.thenCompose(receiptAllowed -> {
|
||||
if (!receiptAllowed) {
|
||||
throw Status.INVALID_ARGUMENT
|
||||
.withDescription("receipt serial is already redeemed")
|
||||
.asRuntimeException();
|
||||
}
|
||||
return extendBackupVoucher(account, new Account.BackupVoucher(receiptLevel, receiptExpiration));
|
||||
});
|
||||
.join();
|
||||
if (!receiptAllowed) {
|
||||
throw Status.INVALID_ARGUMENT
|
||||
.withDescription("receipt serial is already redeemed")
|
||||
.asRuntimeException();
|
||||
}
|
||||
extendBackupVoucher(account, new Account.BackupVoucher(receiptLevel, receiptExpiration));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -296,11 +283,9 @@ public class BackupAuthManager {
|
||||
*
|
||||
* @param account The account to update
|
||||
* @param backupVoucher The backup voucher to apply to this account
|
||||
* @return A future that completes once the account has been updated to have at least the level and expiration
|
||||
* in the provided voucher.
|
||||
*/
|
||||
public CompletableFuture<Void> extendBackupVoucher(final Account account, final Account.BackupVoucher backupVoucher) {
|
||||
return accountsManager.updateAsync(account, a -> {
|
||||
public void extendBackupVoucher(final Account account, final Account.BackupVoucher backupVoucher) {
|
||||
accountsManager.update(account, a -> {
|
||||
// Receipt credential expirations must be day aligned. Make sure any manually set backupVoucher is also day
|
||||
// aligned
|
||||
final Account.BackupVoucher newPayment = new Account.BackupVoucher(
|
||||
@@ -308,7 +293,7 @@ public class BackupAuthManager {
|
||||
backupVoucher.expiration().truncatedTo(ChronoUnit.DAYS));
|
||||
final Account.BackupVoucher existingPayment = a.getBackupVoucher();
|
||||
a.setBackupVoucher(merge(existingPayment, newPayment));
|
||||
}).thenRun(Util.NOOP);
|
||||
});
|
||||
}
|
||||
|
||||
private static Account.BackupVoucher merge(@Nullable final Account.BackupVoucher prev,
|
||||
|
||||
@@ -8,6 +8,7 @@ package org.whispersystems.textsecuregcm.backup;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.dropwizard.util.DataSize;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import io.micrometer.core.instrument.DistributionSummary;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import io.micrometer.core.instrument.Tag;
|
||||
@@ -42,12 +43,12 @@ import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentials;
|
||||
import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentialsGenerator;
|
||||
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicBackupConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
|
||||
import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException;
|
||||
import org.whispersystems.textsecuregcm.limits.RateLimiters;
|
||||
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
|
||||
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
|
||||
import org.whispersystems.textsecuregcm.securevaluerecovery.SecureValueRecoveryClient;
|
||||
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
|
||||
import org.whispersystems.textsecuregcm.util.AsyncTimerUtil;
|
||||
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
|
||||
import org.whispersystems.textsecuregcm.util.Pair;
|
||||
import org.whispersystems.textsecuregcm.util.ua.UnrecognizedUserAgentException;
|
||||
@@ -128,7 +129,7 @@ public class BackupManager {
|
||||
* @param signature the signature of the presentation
|
||||
* @param publicKey the public key of a key-pair that the presentation must be signed with
|
||||
*/
|
||||
public CompletableFuture<Void> setPublicKey(
|
||||
public void setPublicKey(
|
||||
final BackupAuthCredentialPresentation presentation,
|
||||
final byte[] signature,
|
||||
final ECPublicKey publicKey) {
|
||||
@@ -139,8 +140,10 @@ public class BackupManager {
|
||||
final Pair<BackupCredentialType, BackupLevel> credentialTypeAndBackupLevel =
|
||||
verifyPresentation(presentation).verifySignature(signature, publicKey);
|
||||
|
||||
return backupsDb.setPublicKey(presentation.getBackupId(), credentialTypeAndBackupLevel.second(), publicKey)
|
||||
.exceptionally(ExceptionUtils.exceptionallyHandler(PublicKeyConflictException.class, ex -> {
|
||||
ExceptionUtils.unwrapSupply(
|
||||
PublicKeyConflictException.class,
|
||||
() -> backupsDb.setPublicKey(presentation.getBackupId(), credentialTypeAndBackupLevel.second(), publicKey).join(),
|
||||
_ -> {
|
||||
Metrics.counter(ZK_AUTHN_COUNTER_NAME,
|
||||
SUCCESS_TAG_NAME, String.valueOf(false),
|
||||
FAILURE_REASON_TAG_NAME, "public_key_conflict")
|
||||
@@ -148,7 +151,7 @@ public class BackupManager {
|
||||
throw Status.UNAUTHENTICATED
|
||||
.withDescription("public key does not match existing public key for the backup-id")
|
||||
.asRuntimeException();
|
||||
}));
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -159,31 +162,27 @@ public class BackupManager {
|
||||
* @param backupUser an already ZK authenticated backup user
|
||||
* @return the upload form
|
||||
*/
|
||||
public CompletableFuture<BackupUploadDescriptor> createMessageBackupUploadDescriptor(
|
||||
public BackupUploadDescriptor createMessageBackupUploadDescriptor(
|
||||
final AuthenticatedBackupUser backupUser) {
|
||||
checkBackupLevel(backupUser, BackupLevel.FREE);
|
||||
checkBackupCredentialType(backupUser, BackupCredentialType.MESSAGES);
|
||||
|
||||
// this could race with concurrent updates, but the only effect would be last-writer-wins on the timestamp
|
||||
return backupsDb
|
||||
.addMessageBackup(backupUser)
|
||||
.thenApply(_ ->
|
||||
cdn3BackupCredentialGenerator.generateUpload(cdnMessageBackupName(backupUser)));
|
||||
backupsDb.addMessageBackup(backupUser).join();
|
||||
return cdn3BackupCredentialGenerator.generateUpload(cdnMessageBackupName(backupUser));
|
||||
}
|
||||
|
||||
public CompletableFuture<BackupUploadDescriptor> createTemporaryAttachmentUploadDescriptor(
|
||||
final AuthenticatedBackupUser backupUser) {
|
||||
public BackupUploadDescriptor createTemporaryAttachmentUploadDescriptor(final AuthenticatedBackupUser backupUser)
|
||||
throws RateLimitExceededException {
|
||||
checkBackupLevel(backupUser, BackupLevel.PAID);
|
||||
checkBackupCredentialType(backupUser, BackupCredentialType.MEDIA);
|
||||
|
||||
return rateLimiters.forDescriptor(RateLimiters.For.BACKUP_ATTACHMENT)
|
||||
.validateAsync(rateLimitKey(backupUser)).thenApply(ignored -> {
|
||||
final byte[] bytes = new byte[15];
|
||||
secureRandom.nextBytes(bytes);
|
||||
final String attachmentKey = Base64.getUrlEncoder().encodeToString(bytes);
|
||||
final AttachmentGenerator.Descriptor descriptor = tusAttachmentGenerator.generateAttachment(attachmentKey);
|
||||
return new BackupUploadDescriptor(3, attachmentKey, descriptor.headers(), descriptor.signedUploadLocation());
|
||||
}).toCompletableFuture();
|
||||
rateLimiters.forDescriptor(RateLimiters.For.BACKUP_ATTACHMENT).validate(rateLimitKey(backupUser));
|
||||
final byte[] bytes = new byte[15];
|
||||
secureRandom.nextBytes(bytes);
|
||||
final String attachmentKey = Base64.getUrlEncoder().encodeToString(bytes);
|
||||
final AttachmentGenerator.Descriptor descriptor = tusAttachmentGenerator.generateAttachment(attachmentKey);
|
||||
return new BackupUploadDescriptor(3, attachmentKey, descriptor.headers(), descriptor.signedUploadLocation());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -191,36 +190,35 @@ public class BackupManager {
|
||||
*
|
||||
* @param backupUser an already ZK authenticated backup user
|
||||
*/
|
||||
public CompletableFuture<Void> ttlRefresh(final AuthenticatedBackupUser backupUser) {
|
||||
public void ttlRefresh(final AuthenticatedBackupUser backupUser) {
|
||||
checkBackupLevel(backupUser, BackupLevel.FREE);
|
||||
// update message backup TTL
|
||||
return backupsDb.ttlRefresh(backupUser).thenAccept(storedBackupAttributes -> {
|
||||
if (backupUser.credentialType() == BackupCredentialType.MEDIA) {
|
||||
final long maxTotalMediaSize =
|
||||
dynamicConfigurationManager.getConfiguration().getBackupConfiguration().maxTotalMediaSize();
|
||||
final StoredBackupAttributes storedBackupAttributes = backupsDb.ttlRefresh(backupUser).join();
|
||||
if (backupUser.credentialType() == BackupCredentialType.MEDIA) {
|
||||
final long maxTotalMediaSize =
|
||||
dynamicConfigurationManager.getConfiguration().getBackupConfiguration().maxTotalMediaSize();
|
||||
|
||||
// Report that the backup is out of quota if it cannot store a max size media object
|
||||
final boolean quotaExhausted = storedBackupAttributes.bytesUsed() >=
|
||||
(maxTotalMediaSize - BackupManager.MAX_MEDIA_OBJECT_SIZE);
|
||||
// Report that the backup is out of quota if it cannot store a max size media object
|
||||
final boolean quotaExhausted = storedBackupAttributes.bytesUsed() >=
|
||||
(maxTotalMediaSize - BackupManager.MAX_MEDIA_OBJECT_SIZE);
|
||||
|
||||
final Tags tags = Tags.of(
|
||||
UserAgentTagUtil.getPlatformTag(backupUser.userAgent()),
|
||||
Tag.of("type", backupUser.credentialType().name()),
|
||||
Tag.of("tier", backupUser.backupLevel().name()),
|
||||
Tag.of("quotaExhausted", String.valueOf(quotaExhausted)));
|
||||
final Tags tags = Tags.of(
|
||||
UserAgentTagUtil.getPlatformTag(backupUser.userAgent()),
|
||||
Tag.of("type", backupUser.credentialType().name()),
|
||||
Tag.of("tier", backupUser.backupLevel().name()),
|
||||
Tag.of("quotaExhausted", String.valueOf(quotaExhausted)));
|
||||
|
||||
DistributionSummary.builder(NUM_OBJECTS_SUMMARY_NAME)
|
||||
.tags(tags)
|
||||
.publishPercentileHistogram()
|
||||
.register(Metrics.globalRegistry)
|
||||
.record(storedBackupAttributes.numObjects());
|
||||
DistributionSummary.builder(BYTES_USED_SUMMARY_NAME)
|
||||
.tags(tags)
|
||||
.publishPercentileHistogram()
|
||||
.register(Metrics.globalRegistry)
|
||||
.record(storedBackupAttributes.bytesUsed());
|
||||
}
|
||||
});
|
||||
DistributionSummary.builder(NUM_OBJECTS_SUMMARY_NAME)
|
||||
.tags(tags)
|
||||
.publishPercentileHistogram()
|
||||
.register(Metrics.globalRegistry)
|
||||
.record(storedBackupAttributes.numObjects());
|
||||
DistributionSummary.builder(BYTES_USED_SUMMARY_NAME)
|
||||
.tags(tags)
|
||||
.publishPercentileHistogram()
|
||||
.register(Metrics.globalRegistry)
|
||||
.record(storedBackupAttributes.bytesUsed());
|
||||
}
|
||||
}
|
||||
|
||||
public record BackupInfo(int cdn, String backupSubdir, String mediaSubdir, String messageBackupKey,
|
||||
@@ -232,15 +230,15 @@ public class BackupManager {
|
||||
* @param backupUser an already ZK authenticated backup user
|
||||
* @return Information about the existing backup
|
||||
*/
|
||||
public CompletableFuture<BackupInfo> backupInfo(final AuthenticatedBackupUser backupUser) {
|
||||
public BackupInfo backupInfo(final AuthenticatedBackupUser backupUser) {
|
||||
checkBackupLevel(backupUser, BackupLevel.FREE);
|
||||
return backupsDb.describeBackup(backupUser)
|
||||
.thenApply(backupDescription -> new BackupInfo(
|
||||
backupDescription.cdn(),
|
||||
backupUser.backupDir(),
|
||||
backupUser.mediaDir(),
|
||||
MESSAGE_BACKUP_NAME,
|
||||
backupDescription.mediaUsedSpace()));
|
||||
final BackupsDb.BackupDescription backupDescription = backupsDb.describeBackup(backupUser).join();
|
||||
return new BackupInfo(
|
||||
backupDescription.cdn(),
|
||||
backupUser.backupDir(),
|
||||
backupUser.mediaDir(),
|
||||
MESSAGE_BACKUP_NAME,
|
||||
backupDescription.mediaUsedSpace());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -461,45 +459,47 @@ public class BackupManager {
|
||||
* @param limit The maximum number of list results to return
|
||||
* @return A {@link ListMediaResult}
|
||||
*/
|
||||
public CompletionStage<ListMediaResult> list(
|
||||
public ListMediaResult list(
|
||||
final AuthenticatedBackupUser backupUser,
|
||||
final Optional<String> cursor,
|
||||
final int limit) {
|
||||
checkBackupLevel(backupUser, BackupLevel.FREE);
|
||||
return remoteStorageManager.list(cdnMediaDirectory(backupUser), cursor, limit)
|
||||
.thenApply(result ->
|
||||
new ListMediaResult(
|
||||
result
|
||||
.objects()
|
||||
.stream()
|
||||
.map(entry -> new StorageDescriptorWithLength(
|
||||
remoteStorageManager.cdnNumber(),
|
||||
decodeMediaIdFromCdn(entry.key()),
|
||||
entry.length()
|
||||
))
|
||||
.toList(),
|
||||
result.cursor()
|
||||
));
|
||||
final RemoteStorageManager.ListResult result =
|
||||
remoteStorageManager.list(cdnMediaDirectory(backupUser), cursor, limit).toCompletableFuture().join();
|
||||
return new ListMediaResult(result
|
||||
.objects()
|
||||
.stream()
|
||||
.map(entry -> new StorageDescriptorWithLength(
|
||||
remoteStorageManager.cdnNumber(),
|
||||
decodeMediaIdFromCdn(entry.key()),
|
||||
entry.length()
|
||||
))
|
||||
.toList(),
|
||||
result.cursor());
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> deleteEntireBackup(final AuthenticatedBackupUser backupUser) {
|
||||
public void deleteEntireBackup(final AuthenticatedBackupUser backupUser) {
|
||||
checkBackupLevel(backupUser, BackupLevel.FREE);
|
||||
|
||||
final int deletionConcurrency =
|
||||
dynamicConfigurationManager.getConfiguration().getBackupConfiguration().deletionConcurrency();
|
||||
|
||||
// Clients only include SVRB data with their messages backup-id
|
||||
final CompletableFuture<Void> svrbRemoval = switch(backupUser.credentialType()) {
|
||||
case BackupCredentialType.MESSAGES -> secureValueRecoveryBClient.removeData(svrbIdentifier(backupUser));
|
||||
case BackupCredentialType.MEDIA -> CompletableFuture.completedFuture(null);
|
||||
};
|
||||
return svrbRemoval.thenCompose(_ -> backupsDb
|
||||
// Try to swap out the backupDir for the user
|
||||
.scheduleBackupDeletion(backupUser)
|
||||
if (backupUser.credentialType() == BackupCredentialType.MESSAGES) {
|
||||
secureValueRecoveryBClient.removeData(svrbIdentifier(backupUser)).join();
|
||||
}
|
||||
try {
|
||||
// Try to swap out the backupDir for the user
|
||||
backupsDb.scheduleBackupDeletion(backupUser).join();
|
||||
} catch (Exception e) {
|
||||
final Throwable unwrapped = ExceptionUtils.unwrap(e);
|
||||
if (unwrapped instanceof BackupsDb.PendingDeletionException) {
|
||||
// If there was already a pending swap, try to delete the cdn objects directly
|
||||
.exceptionallyCompose(ExceptionUtils.exceptionallyHandler(BackupsDb.PendingDeletionException.class, e ->
|
||||
AsyncTimerUtil.record(SYNCHRONOUS_DELETE_TIMER, () ->
|
||||
deletePrefix(backupUser.backupDir(), deletionConcurrency)))));
|
||||
SYNCHRONOUS_DELETE_TIMER.record(() -> deletePrefix(backupUser.backupDir(), deletionConcurrency).join());
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -615,11 +615,34 @@ public class BackupManager {
|
||||
* @param signature An XEd25519 signature of the presentation bytes
|
||||
* @return On authentication success, the authenticated backup-id and backup-tier encoded in the presentation
|
||||
*/
|
||||
public CompletableFuture<AuthenticatedBackupUser> authenticateBackupUser(
|
||||
public AuthenticatedBackupUser authenticateBackupUser(
|
||||
final BackupAuthCredentialPresentation presentation,
|
||||
final byte[] signature,
|
||||
final String userAgentString) {
|
||||
return ExceptionUtils.unwrapSupply(
|
||||
StatusRuntimeException.class,
|
||||
() -> authenticateBackupUserAsync(presentation, signature, userAgentString).join());
|
||||
}
|
||||
|
||||
/**
|
||||
* Authenticate the ZK anonymous backup credential's presentation
|
||||
* <p>
|
||||
* This validates:
|
||||
* <li> The presentation was for a credential issued by the server </li>
|
||||
* <li> The credential is in its redemption window </li>
|
||||
* <li> The backup-id matches a previously committed blinded backup-id and server issued receipt level </li>
|
||||
* <li> The signature of the credential matches an existing publicKey associated with this backup-id </li>
|
||||
*
|
||||
* @param presentation A {@link BackupAuthCredentialPresentation}
|
||||
* @param signature An XEd25519 signature of the presentation bytes
|
||||
* @return A future that completes with the authenticated backup-id and backup-tier encoded in the presentation
|
||||
*/
|
||||
public CompletableFuture<AuthenticatedBackupUser> authenticateBackupUserAsync(
|
||||
final BackupAuthCredentialPresentation presentation,
|
||||
final byte[] signature,
|
||||
final String userAgentString) {
|
||||
final PresentationSignatureVerifier signatureVerifier = verifyPresentation(presentation);
|
||||
|
||||
return backupsDb
|
||||
.retrieveAuthenticationData(presentation.getBackupId())
|
||||
.thenApply(optionalAuthenticationData -> {
|
||||
@@ -701,7 +724,6 @@ public class BackupManager {
|
||||
* List and delete all files associated with a prefix
|
||||
*
|
||||
* @param prefixToDelete The prefix to expire.
|
||||
* @return A stage that completes when all objects with the given prefix have been deleted
|
||||
*/
|
||||
private CompletableFuture<Void> deletePrefix(final String prefixToDelete, int concurrentDeletes) {
|
||||
if (prefixToDelete.length() != BackupsDb.BACKUP_DIRECTORY_PATH_LENGTH
|
||||
|
||||
@@ -47,28 +47,28 @@ import java.lang.annotation.RetentionPolicy;
|
||||
import java.lang.annotation.Target;
|
||||
import java.time.Clock;
|
||||
import java.time.Instant;
|
||||
import java.util.Arrays;
|
||||
import java.util.Base64;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import org.glassfish.jersey.server.ManagedAsync;
|
||||
import org.signal.libsignal.protocol.ecc.ECPublicKey;
|
||||
import org.signal.libsignal.zkgroup.InvalidInputException;
|
||||
import org.signal.libsignal.zkgroup.backups.BackupAuthCredentialPresentation;
|
||||
import org.signal.libsignal.zkgroup.backups.BackupAuthCredentialRequest;
|
||||
import org.signal.libsignal.zkgroup.backups.BackupCredentialType;
|
||||
import org.signal.libsignal.zkgroup.receipts.ReceiptCredentialPresentation;
|
||||
import org.whispersystems.textsecuregcm.auth.AuthenticatedBackupUser;
|
||||
import org.whispersystems.textsecuregcm.auth.AuthenticatedDevice;
|
||||
import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentials;
|
||||
import org.whispersystems.textsecuregcm.auth.RedemptionRange;
|
||||
import org.whispersystems.textsecuregcm.backup.BackupAuthManager;
|
||||
import org.whispersystems.textsecuregcm.backup.BackupManager;
|
||||
import org.whispersystems.textsecuregcm.backup.BackupUploadDescriptor;
|
||||
import org.whispersystems.textsecuregcm.backup.CopyParameters;
|
||||
import org.whispersystems.textsecuregcm.backup.CopyResult;
|
||||
import org.whispersystems.textsecuregcm.backup.MediaEncryptionParameters;
|
||||
@@ -83,8 +83,6 @@ import org.whispersystems.textsecuregcm.util.ByteArrayAdapter;
|
||||
import org.whispersystems.textsecuregcm.util.ByteArrayBase64UrlAdapter;
|
||||
import org.whispersystems.textsecuregcm.util.ECPublicKeyAdapter;
|
||||
import org.whispersystems.textsecuregcm.util.ExactlySize;
|
||||
import org.whispersystems.textsecuregcm.util.Util;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
@Path("/v1/archives")
|
||||
@io.swagger.v3.oas.annotations.tags.Tag(name = "Archive")
|
||||
@@ -149,24 +147,20 @@ public class ArchiveController {
|
||||
@ApiResponse(responseCode = "400", description = "The provided backup auth credential request was invalid")
|
||||
@ApiResponse(responseCode = "403", description = "The device did not have permission to set the backup-id. Only the primary device can set the backup-id for an account")
|
||||
@ApiResponse(responseCode = "429", description = "Rate limited. Too many attempts to change the backup-id have been made")
|
||||
public CompletionStage<Response> setBackupId(
|
||||
@ManagedAsync
|
||||
public void setBackupId(
|
||||
@Auth final AuthenticatedDevice authenticatedDevice,
|
||||
@Valid @NotNull final SetBackupIdRequest setBackupIdRequest) throws RateLimitExceededException {
|
||||
|
||||
return accountsManager.getByAccountIdentifierAsync(authenticatedDevice.accountIdentifier())
|
||||
.thenCompose(maybeAccount -> {
|
||||
final Account account = maybeAccount
|
||||
.orElseThrow(() -> new WebApplicationException(Response.Status.UNAUTHORIZED));
|
||||
final Account account = accountsManager.getByAccountIdentifier(authenticatedDevice.accountIdentifier())
|
||||
.orElseThrow(() -> new WebApplicationException(Response.Status.UNAUTHORIZED));
|
||||
final Device device = account.getDevice(authenticatedDevice.deviceId())
|
||||
.orElseThrow(() -> new WebApplicationException(Response.Status.UNAUTHORIZED));
|
||||
|
||||
final Device device = account.getDevice(authenticatedDevice.deviceId())
|
||||
.orElseThrow(() -> new WebApplicationException(Response.Status.UNAUTHORIZED));
|
||||
|
||||
return backupAuthManager
|
||||
.commitBackupId(account, device,
|
||||
Optional.ofNullable(setBackupIdRequest.messagesBackupAuthCredentialRequest),
|
||||
Optional.ofNullable(setBackupIdRequest.mediaBackupAuthCredentialRequest))
|
||||
.thenApply(Util.ASYNC_EMPTY_RESPONSE);
|
||||
});
|
||||
backupAuthManager
|
||||
.commitBackupId(account, device,
|
||||
Optional.ofNullable(setBackupIdRequest.messagesBackupAuthCredentialRequest),
|
||||
Optional.ofNullable(setBackupIdRequest.mediaBackupAuthCredentialRequest));
|
||||
}
|
||||
|
||||
|
||||
@@ -186,15 +180,13 @@ public class ArchiveController {
|
||||
""")
|
||||
@ApiResponse(responseCode = "200", description = "Successfully retrieved backup-id rotation limits", useReturnTypeSchema = true)
|
||||
@ApiResponse(responseCode = "403", description = "Invalid account authentication")
|
||||
public CompletionStage<BackupIdLimitResponse> checkLimits(@Auth final AuthenticatedDevice authenticatedDevice) {
|
||||
return accountsManager.getByAccountIdentifierAsync(authenticatedDevice.accountIdentifier())
|
||||
.thenCompose(maybeAccount -> {
|
||||
final Account account = maybeAccount
|
||||
.orElseThrow(() -> new WebApplicationException(Response.Status.UNAUTHORIZED));
|
||||
@ManagedAsync
|
||||
public BackupIdLimitResponse checkLimits(@Auth final AuthenticatedDevice authenticatedDevice) {
|
||||
final Account account = accountsManager.getByAccountIdentifier(authenticatedDevice.accountIdentifier())
|
||||
.orElseThrow(() -> new WebApplicationException(Response.Status.UNAUTHORIZED));
|
||||
|
||||
return backupAuthManager.checkBackupIdRotationLimit(account).thenApply(limit ->
|
||||
new BackupIdLimitResponse(limit.hasPermitsRemaining(), limit.nextPermitAvailable().getSeconds()));
|
||||
});
|
||||
final BackupAuthManager.BackupIdRotationLimit limit = backupAuthManager.checkBackupIdRotationLimit(account);
|
||||
return new BackupIdLimitResponse(limit.hasPermitsRemaining(), limit.nextPermitAvailable().getSeconds());
|
||||
}
|
||||
|
||||
public record RedeemBackupReceiptRequest(
|
||||
@@ -237,18 +229,15 @@ public class ArchiveController {
|
||||
@ApiResponse(responseCode = "400", description = "The provided presentation or receipt was invalid")
|
||||
@ApiResponse(responseCode = "409", description = "The target account does not have a backup-id commitment")
|
||||
@ApiResponse(responseCode = "429", description = "Rate limited.")
|
||||
public CompletionStage<Response> redeemReceipt(
|
||||
@ManagedAsync
|
||||
public void redeemReceipt(
|
||||
@Auth final AuthenticatedDevice authenticatedDevice,
|
||||
@Valid @NotNull final RedeemBackupReceiptRequest redeemBackupReceiptRequest) {
|
||||
|
||||
return accountsManager.getByAccountIdentifierAsync(authenticatedDevice.accountIdentifier())
|
||||
.thenCompose(maybeAccount -> {
|
||||
final Account account = maybeAccount
|
||||
final Account account = accountsManager.getByAccountIdentifier(authenticatedDevice.accountIdentifier())
|
||||
.orElseThrow(() -> new WebApplicationException(Response.Status.UNAUTHORIZED));
|
||||
|
||||
return backupAuthManager.redeemReceipt(account, redeemBackupReceiptRequest.receiptCredentialPresentation())
|
||||
.thenApply(Util.ASYNC_EMPTY_RESPONSE);
|
||||
});
|
||||
backupAuthManager.redeemReceipt(account, redeemBackupReceiptRequest.receiptCredentialPresentation());
|
||||
}
|
||||
|
||||
public record BackupAuthCredentialsResponse(
|
||||
@@ -306,14 +295,15 @@ public class ArchiveController {
|
||||
@ApiResponse(responseCode = "400", description = "The start/end did not meet alignment/duration requirements")
|
||||
@ApiResponse(responseCode = "404", description = "Could not find an existing blinded backup id")
|
||||
@ApiResponse(responseCode = "429", description = "Rate limited.")
|
||||
public CompletionStage<BackupAuthCredentialsResponse> getBackupZKCredentials(
|
||||
@ManagedAsync
|
||||
public BackupAuthCredentialsResponse getBackupZKCredentials(
|
||||
@Auth AuthenticatedDevice authenticatedDevice,
|
||||
@HeaderParam(HttpHeaders.USER_AGENT) final String userAgent,
|
||||
@NotNull @QueryParam("redemptionStartSeconds") Long startSeconds,
|
||||
@NotNull @QueryParam("redemptionEndSeconds") Long endSeconds) {
|
||||
|
||||
final Map<BackupCredentialType, List<BackupAuthCredentialsResponse.BackupAuthCredential>> credentialsByType =
|
||||
new ConcurrentHashMap<>();
|
||||
new HashMap<>();
|
||||
|
||||
final RedemptionRange redemptionRange;
|
||||
try {
|
||||
@@ -322,33 +312,26 @@ public class ArchiveController {
|
||||
throw new BadRequestException(e.getMessage());
|
||||
}
|
||||
|
||||
return accountsManager.getByAccountIdentifierAsync(authenticatedDevice.accountIdentifier())
|
||||
.thenCompose(maybeAccount -> {
|
||||
final Account account = maybeAccount
|
||||
.orElseThrow(() -> new WebApplicationException(Response.Status.UNAUTHORIZED));
|
||||
final Account account = accountsManager.getByAccountIdentifier(authenticatedDevice.accountIdentifier())
|
||||
.orElseThrow(() -> new WebApplicationException(Response.Status.UNAUTHORIZED));
|
||||
|
||||
return CompletableFuture.allOf(Arrays.stream(BackupCredentialType.values())
|
||||
.map(credentialType -> this.backupAuthManager.getBackupAuthCredentials(
|
||||
account,
|
||||
credentialType,
|
||||
redemptionRange)
|
||||
.thenAccept(credentials -> {
|
||||
backupMetrics.updateGetCredentialCounter(
|
||||
UserAgentTagUtil.getPlatformTag(userAgent),
|
||||
credentialType,
|
||||
credentials.size());
|
||||
credentialsByType.put(credentialType, credentials.stream()
|
||||
.map(credential -> new BackupAuthCredentialsResponse.BackupAuthCredential(
|
||||
credential.credential().serialize(),
|
||||
credential.redemptionTime().getEpochSecond()))
|
||||
.toList());
|
||||
}))
|
||||
.toArray(CompletableFuture[]::new))
|
||||
.thenApply(ignored -> new BackupAuthCredentialsResponse(credentialsByType.entrySet().stream()
|
||||
.collect(Collectors.toMap(
|
||||
e -> BackupAuthCredentialsResponse.CredentialType.fromLibsignalType(e.getKey()),
|
||||
Map.Entry::getValue))));
|
||||
});
|
||||
for (BackupCredentialType credentialType : BackupCredentialType.values()) {
|
||||
final List<BackupAuthManager.Credential> credentials =
|
||||
backupAuthManager.getBackupAuthCredentials(account, credentialType, redemptionRange);
|
||||
backupMetrics.updateGetCredentialCounter(
|
||||
UserAgentTagUtil.getPlatformTag(userAgent),
|
||||
credentialType,
|
||||
credentials.size());
|
||||
credentialsByType.put(credentialType, credentials.stream()
|
||||
.map(credential -> new BackupAuthCredentialsResponse.BackupAuthCredential(
|
||||
credential.credential().serialize(),
|
||||
credential.redemptionTime().getEpochSecond()))
|
||||
.toList());
|
||||
}
|
||||
return new BackupAuthCredentialsResponse(credentialsByType.entrySet().stream()
|
||||
.collect(Collectors.toMap(
|
||||
e -> BackupAuthCredentialsResponse.CredentialType.fromLibsignalType(e.getKey()),
|
||||
Map.Entry::getValue)));
|
||||
}
|
||||
|
||||
|
||||
@@ -410,7 +393,8 @@ public class ArchiveController {
|
||||
@ApiResponse(responseCode = "200", content = @Content(schema = @Schema(implementation = ReadAuthResponse.class)))
|
||||
@ApiResponse(responseCode = "429", description = "Rate limited.")
|
||||
@ApiResponseZkAuth
|
||||
public CompletionStage<ReadAuthResponse> readAuth(
|
||||
@ManagedAsync
|
||||
public ReadAuthResponse readAuth(
|
||||
@Auth final Optional<AuthenticatedDevice> account,
|
||||
@HeaderParam(HttpHeaders.USER_AGENT) final String userAgent,
|
||||
|
||||
@@ -426,9 +410,9 @@ public class ArchiveController {
|
||||
if (account.isPresent()) {
|
||||
throw new BadRequestException("must not use authenticated connection for anonymous operations");
|
||||
}
|
||||
return backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent)
|
||||
.thenApply(user -> backupManager.generateReadAuth(user, cdn))
|
||||
.thenApply(ReadAuthResponse::new);
|
||||
final AuthenticatedBackupUser backupUser =
|
||||
backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent);
|
||||
return new ReadAuthResponse(backupManager.generateReadAuth(backupUser, cdn));
|
||||
}
|
||||
|
||||
@GET
|
||||
@@ -441,7 +425,8 @@ public class ArchiveController {
|
||||
""")
|
||||
@ApiResponse(responseCode = "200", description = "`JSON` with generated credentials.", useReturnTypeSchema = true)
|
||||
@ApiResponseZkAuth
|
||||
public CompletionStage<ExternalServiceCredentials> svrbAuth(
|
||||
@ManagedAsync
|
||||
public ExternalServiceCredentials svrbAuth(
|
||||
@Auth final Optional<AuthenticatedDevice> account,
|
||||
@HeaderParam(HttpHeaders.USER_AGENT) final String userAgent,
|
||||
|
||||
@@ -455,9 +440,9 @@ public class ArchiveController {
|
||||
if (account.isPresent()) {
|
||||
throw new BadRequestException("must not use authenticated connection for anonymous operations");
|
||||
}
|
||||
return backupManager
|
||||
.authenticateBackupUser(presentation.presentation, signature.signature, userAgent)
|
||||
.thenApply(backupManager::generateSvrbAuth);
|
||||
final AuthenticatedBackupUser backupUser =
|
||||
backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent);
|
||||
return backupManager.generateSvrbAuth(backupUser);
|
||||
}
|
||||
|
||||
public record BackupInfoResponse(
|
||||
@@ -491,7 +476,8 @@ public class ArchiveController {
|
||||
@ApiResponse(responseCode = "404", description = "No existing backups found")
|
||||
@ApiResponse(responseCode = "429", description = "Rate limited.")
|
||||
@ApiResponseZkAuth
|
||||
public CompletionStage<BackupInfoResponse> backupInfo(
|
||||
@ManagedAsync
|
||||
public BackupInfoResponse backupInfo(
|
||||
@Auth final Optional<AuthenticatedDevice> account,
|
||||
@HeaderParam(HttpHeaders.USER_AGENT) final String userAgent,
|
||||
|
||||
@@ -506,14 +492,15 @@ public class ArchiveController {
|
||||
throw new BadRequestException("must not use authenticated connection for anonymous operations");
|
||||
}
|
||||
|
||||
return backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent)
|
||||
.thenCompose(backupManager::backupInfo)
|
||||
.thenApply(backupInfo -> new BackupInfoResponse(
|
||||
final AuthenticatedBackupUser backupUser =
|
||||
backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent);
|
||||
final BackupManager.BackupInfo backupInfo = backupManager.backupInfo(backupUser);
|
||||
return new BackupInfoResponse(
|
||||
backupInfo.cdn(),
|
||||
backupInfo.backupSubdir(),
|
||||
backupInfo.mediaSubdir(),
|
||||
backupInfo.messageBackupKey(),
|
||||
backupInfo.mediaUsedSpace().orElse(0L)));
|
||||
backupInfo.mediaUsedSpace().orElse(0L));
|
||||
}
|
||||
|
||||
public record SetPublicKeyRequest(
|
||||
@@ -531,13 +518,14 @@ public class ArchiveController {
|
||||
summary = "Set public key",
|
||||
description = """
|
||||
Permanently set the public key of an ED25519 key-pair for the backup-id. All requests that provide a anonymous
|
||||
BackupAuthCredentialPresentation (including this one!) must also sign the presentation with the private key
|
||||
BackupAuthCredentialPresentation (including this one!) must also sign the presentation with the private key
|
||||
corresponding to the provided public key.
|
||||
""")
|
||||
@ApiResponseZkAuth
|
||||
@ApiResponse(responseCode = "204", description = "The public key was set")
|
||||
@ApiResponse(responseCode = "429", description = "Rate limited.")
|
||||
public CompletionStage<Response> setPublicKey(
|
||||
@ManagedAsync
|
||||
public void setPublicKey(
|
||||
@Auth final Optional<AuthenticatedDevice> account,
|
||||
|
||||
@Parameter(description = BackupAuthCredentialPresentationHeader.DESCRIPTION, schema = @Schema(implementation = String.class))
|
||||
@@ -552,9 +540,7 @@ public class ArchiveController {
|
||||
if (account.isPresent()) {
|
||||
throw new BadRequestException("must not use authenticated connection for anonymous operations");
|
||||
}
|
||||
return backupManager
|
||||
.setPublicKey(presentation.presentation, signature.signature, setPublicKeyRequest.backupIdPublicKey)
|
||||
.thenApply(Util.ASYNC_EMPTY_RESPONSE);
|
||||
backupManager.setPublicKey(presentation.presentation, signature.signature, setPublicKeyRequest.backupIdPublicKey);
|
||||
}
|
||||
|
||||
|
||||
@@ -578,7 +564,8 @@ public class ArchiveController {
|
||||
@ApiResponse(responseCode = "429", description = "Rate limited.")
|
||||
@ApiResponse(responseCode = "413", description = "The provided uploadLength is larger than the maximum supported upload size. The maximum upload size is subject to change.")
|
||||
@ApiResponseZkAuth
|
||||
public CompletionStage<UploadDescriptorResponse> backup(
|
||||
@ManagedAsync
|
||||
public UploadDescriptorResponse backup(
|
||||
@Auth final Optional<AuthenticatedDevice> account,
|
||||
@HeaderParam(HttpHeaders.USER_AGENT) final String userAgent,
|
||||
|
||||
@@ -595,22 +582,25 @@ public class ArchiveController {
|
||||
if (account.isPresent()) {
|
||||
throw new BadRequestException("must not use authenticated connection for anonymous operations");
|
||||
}
|
||||
return backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent)
|
||||
.thenCompose(backupUser -> {
|
||||
final boolean oversize = uploadLength
|
||||
.map(length -> length > BackupManager.MAX_MESSAGE_BACKUP_OBJECT_SIZE)
|
||||
.orElse(false);
|
||||
backupMetrics.updateMessageBackupSizeDistribution(backupUser, oversize, uploadLength);
|
||||
if (oversize) {
|
||||
throw new ClientErrorException("exceeded maximum uploadLength", Response.Status.REQUEST_ENTITY_TOO_LARGE);
|
||||
}
|
||||
return backupManager.createMessageBackupUploadDescriptor(backupUser);
|
||||
})
|
||||
.thenApply(result -> new UploadDescriptorResponse(
|
||||
result.cdn(),
|
||||
result.key(),
|
||||
result.headers(),
|
||||
result.signedUploadLocation()));
|
||||
|
||||
final AuthenticatedBackupUser backupUser =
|
||||
backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent);
|
||||
|
||||
final boolean oversize = uploadLength
|
||||
.map(length -> length > BackupManager.MAX_MESSAGE_BACKUP_OBJECT_SIZE)
|
||||
.orElse(false);
|
||||
|
||||
backupMetrics.updateMessageBackupSizeDistribution(backupUser, oversize, uploadLength);
|
||||
if (oversize) {
|
||||
throw new ClientErrorException("exceeded maximum uploadLength", Response.Status.REQUEST_ENTITY_TOO_LARGE);
|
||||
}
|
||||
final BackupUploadDescriptor uploadDescriptor =
|
||||
backupManager.createMessageBackupUploadDescriptor(backupUser);
|
||||
return new UploadDescriptorResponse(
|
||||
uploadDescriptor.cdn(),
|
||||
uploadDescriptor.key(),
|
||||
uploadDescriptor.headers(),
|
||||
uploadDescriptor.signedUploadLocation());
|
||||
}
|
||||
|
||||
@GET
|
||||
@@ -627,7 +617,8 @@ public class ArchiveController {
|
||||
@ApiResponse(responseCode = "200", content = @Content(schema = @Schema(implementation = UploadDescriptorResponse.class)))
|
||||
@ApiResponse(responseCode = "429", description = "Rate limited.")
|
||||
@ApiResponseZkAuth
|
||||
public CompletionStage<UploadDescriptorResponse> uploadTemporaryAttachment(
|
||||
@ManagedAsync
|
||||
public UploadDescriptorResponse uploadTemporaryAttachment(
|
||||
@Auth final Optional<AuthenticatedDevice> account,
|
||||
@HeaderParam(HttpHeaders.USER_AGENT) final String userAgent,
|
||||
|
||||
@@ -638,17 +629,20 @@ public class ArchiveController {
|
||||
|
||||
@Parameter(description = BackupAuthCredentialPresentationSignature.DESCRIPTION, schema = @Schema(implementation = String.class))
|
||||
@NotNull
|
||||
@HeaderParam(X_SIGNAL_ZK_AUTH_SIGNATURE) final BackupAuthCredentialPresentationSignature signature) {
|
||||
@HeaderParam(X_SIGNAL_ZK_AUTH_SIGNATURE) final BackupAuthCredentialPresentationSignature signature)
|
||||
throws RateLimitExceededException {
|
||||
if (account.isPresent()) {
|
||||
throw new BadRequestException("must not use authenticated connection for anonymous operations");
|
||||
}
|
||||
return backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent)
|
||||
.thenCompose(backupManager::createTemporaryAttachmentUploadDescriptor)
|
||||
.thenApply(result -> new UploadDescriptorResponse(
|
||||
result.cdn(),
|
||||
result.key(),
|
||||
result.headers(),
|
||||
result.signedUploadLocation()));
|
||||
final AuthenticatedBackupUser backupUser =
|
||||
backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent);
|
||||
final BackupUploadDescriptor uploadDescriptor =
|
||||
backupManager.createTemporaryAttachmentUploadDescriptor(backupUser);
|
||||
return new UploadDescriptorResponse(
|
||||
uploadDescriptor.cdn(),
|
||||
uploadDescriptor.key(),
|
||||
uploadDescriptor.headers(),
|
||||
uploadDescriptor.signedUploadLocation());
|
||||
}
|
||||
|
||||
public record CopyMediaRequest(
|
||||
@@ -715,7 +709,8 @@ public class ArchiveController {
|
||||
@ApiResponse(responseCode = "410", description = "The source object was not found.")
|
||||
@ApiResponse(responseCode = "429", description = "Rate limited.")
|
||||
@ApiResponseZkAuth
|
||||
public CompletionStage<CopyMediaResponse> copyMedia(
|
||||
@ManagedAsync
|
||||
public CopyMediaResponse copyMedia(
|
||||
@Auth final Optional<AuthenticatedDevice> account,
|
||||
@HeaderParam(HttpHeaders.USER_AGENT) final String userAgent,
|
||||
|
||||
@@ -733,19 +728,20 @@ public class ArchiveController {
|
||||
throw new BadRequestException("must not use authenticated connection for anonymous operations");
|
||||
}
|
||||
|
||||
return Mono
|
||||
.fromFuture(backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent))
|
||||
.flatMap(backupUser -> backupManager.copyToBackup(backupUser, List.of(copyMediaRequest.toCopyParameters()))
|
||||
.next()
|
||||
.doOnNext(result -> backupMetrics.updateCopyCounter(result, UserAgentTagUtil.getPlatformTag(userAgent)))
|
||||
.map(copyResult -> switch (copyResult.outcome()) {
|
||||
case SUCCESS -> new CopyMediaResponse(copyResult.cdn());
|
||||
case SOURCE_WRONG_LENGTH -> throw new BadRequestException("Invalid length");
|
||||
case SOURCE_NOT_FOUND -> throw new ClientErrorException("Source object not found", Response.Status.GONE);
|
||||
case OUT_OF_QUOTA ->
|
||||
throw new ClientErrorException("Media quota exhausted", Response.Status.REQUEST_ENTITY_TOO_LARGE);
|
||||
}))
|
||||
.toFuture();
|
||||
final AuthenticatedBackupUser backupUser =
|
||||
backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent);
|
||||
final CopyResult copyResult =
|
||||
backupManager.copyToBackup(backupUser, List.of(copyMediaRequest.toCopyParameters())).next()
|
||||
.blockOptional()
|
||||
.orElseThrow(() -> new IllegalStateException("Non empty copy request must return result"));
|
||||
backupMetrics.updateCopyCounter(copyResult, UserAgentTagUtil.getPlatformTag(userAgent));
|
||||
return switch (copyResult.outcome()) {
|
||||
case SUCCESS -> new CopyMediaResponse(copyResult.cdn());
|
||||
case SOURCE_WRONG_LENGTH -> throw new BadRequestException("Invalid length");
|
||||
case SOURCE_NOT_FOUND -> throw new ClientErrorException("Source object not found", Response.Status.GONE);
|
||||
case OUT_OF_QUOTA ->
|
||||
throw new ClientErrorException("Media quota exhausted", Response.Status.REQUEST_ENTITY_TOO_LARGE);
|
||||
};
|
||||
}
|
||||
|
||||
public record CopyMediaBatchRequest(
|
||||
@@ -808,13 +804,14 @@ public class ArchiveController {
|
||||
be provided as a separate entry in the response.
|
||||
""")
|
||||
@ApiResponse(responseCode = "207", description = """
|
||||
The request was processed and each operation's outcome must be inspected individually. This does NOT necessarily
|
||||
The request was processed and each operation's outcome must be inspected individually. This does NOT necessarily
|
||||
indicate the operation was a success.
|
||||
""", content = @Content(schema = @Schema(implementation = CopyMediaBatchResponse.class)))
|
||||
@ApiResponse(responseCode = "413", description = "All media capacity has been consumed. Free some space to continue.")
|
||||
@ApiResponse(responseCode = "429", description = "Rate limited.")
|
||||
@ApiResponseZkAuth
|
||||
public CompletionStage<Response> copyMedia(
|
||||
@ManagedAsync
|
||||
public Response copyMedia(
|
||||
@Auth final Optional<AuthenticatedDevice> account,
|
||||
@HeaderParam(HttpHeaders.USER_AGENT) final String userAgent,
|
||||
|
||||
@@ -833,13 +830,13 @@ public class ArchiveController {
|
||||
throw new BadRequestException("must not use authenticated connection for anonymous operations");
|
||||
}
|
||||
final Stream<CopyParameters> copyParams = copyMediaRequest.items().stream().map(CopyMediaRequest::toCopyParameters);
|
||||
return Mono.fromFuture(backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent))
|
||||
.flatMapMany(backupUser -> backupManager.copyToBackup(backupUser, copyParams.toList()))
|
||||
final AuthenticatedBackupUser backupUser =
|
||||
backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent);
|
||||
final List<CopyMediaBatchResponse.Entry> copyResults = backupManager.copyToBackup(backupUser, copyParams.toList())
|
||||
.doOnNext(result -> backupMetrics.updateCopyCounter(result, UserAgentTagUtil.getPlatformTag(userAgent)))
|
||||
.map(CopyMediaBatchResponse.Entry::fromCopyResult)
|
||||
.collectList()
|
||||
.map(list -> Response.status(207).entity(new CopyMediaBatchResponse(list)).build())
|
||||
.toFuture();
|
||||
.collectList().block();
|
||||
return Response.status(207).entity(new CopyMediaBatchResponse(copyResults)).build();
|
||||
}
|
||||
|
||||
@POST
|
||||
@@ -853,7 +850,8 @@ public class ArchiveController {
|
||||
@ApiResponse(responseCode = "204", description = "The backup was successfully refreshed")
|
||||
@ApiResponse(responseCode = "429", description = "Rate limited.")
|
||||
@ApiResponseZkAuth
|
||||
public CompletionStage<Response> refresh(
|
||||
@ManagedAsync
|
||||
public void refresh(
|
||||
@Auth final Optional<AuthenticatedDevice> account,
|
||||
@HeaderParam(HttpHeaders.USER_AGENT) final String userAgent,
|
||||
|
||||
@@ -867,10 +865,9 @@ public class ArchiveController {
|
||||
if (account.isPresent()) {
|
||||
throw new BadRequestException("must not use authenticated connection for anonymous operations");
|
||||
}
|
||||
return backupManager
|
||||
.authenticateBackupUser(presentation.presentation, signature.signature, userAgent)
|
||||
.thenCompose(backupManager::ttlRefresh)
|
||||
.thenApply(Util.ASYNC_EMPTY_RESPONSE);
|
||||
final AuthenticatedBackupUser backupUser =
|
||||
backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent);
|
||||
backupManager.ttlRefresh(backupUser);
|
||||
}
|
||||
|
||||
record StoredMediaObject(
|
||||
@@ -920,7 +917,8 @@ public class ArchiveController {
|
||||
@ApiResponse(responseCode = "400", description = "Invalid cursor or limit")
|
||||
@ApiResponse(responseCode = "429", description = "Rate limited.")
|
||||
@ApiResponseZkAuth
|
||||
public CompletionStage<ListResponse> listMedia(
|
||||
@ManagedAsync
|
||||
public ListResponse listMedia(
|
||||
@Auth final Optional<AuthenticatedDevice> account,
|
||||
@HeaderParam(HttpHeaders.USER_AGENT) final String userAgent,
|
||||
|
||||
@@ -940,16 +938,16 @@ public class ArchiveController {
|
||||
if (account.isPresent()) {
|
||||
throw new BadRequestException("must not use authenticated connection for anonymous operations");
|
||||
}
|
||||
return backupManager
|
||||
.authenticateBackupUser(presentation.presentation, signature.signature, userAgent)
|
||||
.thenCompose(backupUser -> backupManager.list(backupUser, cursor, limit.orElse(1000))
|
||||
.thenApply(result -> new ListResponse(
|
||||
result.media()
|
||||
.stream().map(entry -> new StoredMediaObject(entry.cdn(), entry.key(), entry.length()))
|
||||
.toList(),
|
||||
backupUser.backupDir(),
|
||||
backupUser.mediaDir(),
|
||||
result.cursor().orElse(null))));
|
||||
final AuthenticatedBackupUser backupUser =
|
||||
backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent);
|
||||
final BackupManager.ListMediaResult listResult =
|
||||
backupManager.list(backupUser, cursor, limit.orElse(1000));
|
||||
return new ListResponse(listResult.media()
|
||||
.stream().map(entry -> new StoredMediaObject(entry.cdn(), entry.key(), entry.length()))
|
||||
.toList(),
|
||||
backupUser.backupDir(),
|
||||
backupUser.mediaDir(),
|
||||
listResult.cursor().orElse(null));
|
||||
}
|
||||
|
||||
public record DeleteMedia(@Size(min = 1, max = 1000) List<@Valid MediaToDelete> mediaToDelete) {
|
||||
@@ -976,7 +974,8 @@ public class ArchiveController {
|
||||
@ApiResponse(responseCode = "204", description = "The provided objects were successfully deleted or they do not exist")
|
||||
@ApiResponse(responseCode = "429", description = "Rate limited.")
|
||||
@ApiResponseZkAuth
|
||||
public CompletionStage<Response> deleteMedia(
|
||||
@ManagedAsync
|
||||
public void deleteMedia(
|
||||
@Auth final Optional<AuthenticatedDevice> account,
|
||||
@HeaderParam(HttpHeaders.USER_AGENT) final String userAgent,
|
||||
|
||||
@@ -997,12 +996,9 @@ public class ArchiveController {
|
||||
.map(media -> new BackupManager.StorageDescriptor(media.cdn(), media.mediaId))
|
||||
.toList();
|
||||
|
||||
return backupManager
|
||||
.authenticateBackupUser(presentation.presentation, signature.signature, userAgent)
|
||||
.thenCompose(authenticatedBackupUser -> backupManager
|
||||
.deleteMedia(authenticatedBackupUser, toDelete)
|
||||
.then().toFuture())
|
||||
.thenApply(Util.ASYNC_EMPTY_RESPONSE);
|
||||
final AuthenticatedBackupUser backupUser =
|
||||
backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent);
|
||||
backupManager.deleteMedia(backupUser, toDelete).then().block();
|
||||
}
|
||||
|
||||
@DELETE
|
||||
@@ -1013,7 +1009,8 @@ public class ArchiveController {
|
||||
@ApiResponse(responseCode = "204", description = "The backup has been successfully removed")
|
||||
@ApiResponse(responseCode = "429", description = "Rate limited.")
|
||||
@ApiResponseZkAuth
|
||||
public CompletionStage<Response> deleteBackup(
|
||||
@ManagedAsync
|
||||
public void deleteBackup(
|
||||
@Auth final Optional<AuthenticatedDevice> account,
|
||||
@HeaderParam(HttpHeaders.USER_AGENT) final String userAgent,
|
||||
|
||||
@@ -1027,10 +1024,10 @@ public class ArchiveController {
|
||||
if (account.isPresent()) {
|
||||
throw new BadRequestException("must not use authenticated connection for anonymous operations");
|
||||
}
|
||||
return backupManager
|
||||
.authenticateBackupUser(presentation.presentation, signature.signature, userAgent)
|
||||
.thenCompose(backupManager::deleteEntireBackup)
|
||||
.thenApply(Util.ASYNC_EMPTY_RESPONSE);
|
||||
final AuthenticatedBackupUser backupUser =
|
||||
backupManager.authenticateBackupUser(presentation.presentation, signature.signature, userAgent);
|
||||
|
||||
backupManager.deleteEntireBackup(backupUser);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -252,8 +252,7 @@ public class DeviceCheckController {
|
||||
switch (request.assertionRequest().action()) {
|
||||
case BACKUP -> backupAuthManager.extendBackupVoucher(
|
||||
account,
|
||||
new Account.BackupVoucher(backupRedemptionLevel, clock.instant().plus(backupRedemptionDuration)))
|
||||
.join();
|
||||
new Account.BackupVoucher(backupRedemptionLevel, clock.instant().plus(backupRedemptionDuration)));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -8,9 +8,7 @@ import com.google.protobuf.ByteString;
|
||||
import io.grpc.Status;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import io.micrometer.core.instrument.Tag;
|
||||
import io.micrometer.core.instrument.Tags;
|
||||
import java.util.concurrent.Flow;
|
||||
import org.signal.chat.backup.CopyMediaRequest;
|
||||
import org.signal.chat.backup.CopyMediaResponse;
|
||||
import org.signal.chat.backup.DeleteAllRequest;
|
||||
@@ -27,29 +25,30 @@ import org.signal.chat.backup.GetUploadFormRequest;
|
||||
import org.signal.chat.backup.GetUploadFormResponse;
|
||||
import org.signal.chat.backup.ListMediaRequest;
|
||||
import org.signal.chat.backup.ListMediaResponse;
|
||||
import org.signal.chat.backup.ReactorBackupsAnonymousGrpc;
|
||||
import org.signal.chat.backup.RefreshRequest;
|
||||
import org.signal.chat.backup.RefreshResponse;
|
||||
import org.signal.chat.backup.SetPublicKeyRequest;
|
||||
import org.signal.chat.backup.SetPublicKeyResponse;
|
||||
import org.signal.chat.backup.SignedPresentation;
|
||||
import org.signal.chat.backup.SimpleBackupsAnonymousGrpc;
|
||||
import org.signal.libsignal.protocol.InvalidKeyException;
|
||||
import org.signal.libsignal.protocol.ecc.ECPublicKey;
|
||||
import org.signal.libsignal.zkgroup.InvalidInputException;
|
||||
import org.signal.libsignal.zkgroup.backups.BackupAuthCredentialPresentation;
|
||||
import org.whispersystems.textsecuregcm.auth.AuthenticatedBackupUser;
|
||||
import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentials;
|
||||
import org.whispersystems.textsecuregcm.backup.BackupManager;
|
||||
import org.whispersystems.textsecuregcm.backup.BackupUploadDescriptor;
|
||||
import org.whispersystems.textsecuregcm.backup.CopyParameters;
|
||||
import org.whispersystems.textsecuregcm.backup.MediaEncryptionParameters;
|
||||
import org.whispersystems.textsecuregcm.controllers.ArchiveController;
|
||||
import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException;
|
||||
import org.whispersystems.textsecuregcm.metrics.BackupMetrics;
|
||||
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
|
||||
import reactor.adapter.JdkFlowAdapter;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
|
||||
|
||||
public class BackupsAnonymousGrpcService extends ReactorBackupsAnonymousGrpc.BackupsAnonymousImplBase {
|
||||
public class BackupsAnonymousGrpcService extends SimpleBackupsAnonymousGrpc.BackupsAnonymousImplBase {
|
||||
|
||||
private final BackupManager backupManager;
|
||||
private final BackupMetrics backupMetrics;
|
||||
@@ -60,87 +59,89 @@ public class BackupsAnonymousGrpcService extends ReactorBackupsAnonymousGrpc.Bac
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<GetCdnCredentialsResponse> getCdnCredentials(final GetCdnCredentialsRequest request) {
|
||||
return authenticateBackupUserMono(request.getSignedPresentation())
|
||||
.map(user -> backupManager.generateReadAuth(user, request.getCdn()))
|
||||
.map(credentials -> GetCdnCredentialsResponse.newBuilder().putAllHeaders(credentials).build());
|
||||
public GetCdnCredentialsResponse getCdnCredentials(final GetCdnCredentialsRequest request) {
|
||||
final AuthenticatedBackupUser backupUser = authenticateBackupUser(request.getSignedPresentation());
|
||||
return GetCdnCredentialsResponse.newBuilder()
|
||||
.putAllHeaders(backupManager.generateReadAuth(backupUser, request.getCdn()))
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<GetSvrBCredentialsResponse> getSvrBCredentials(final GetSvrBCredentialsRequest request) {
|
||||
return authenticateBackupUserMono(request.getSignedPresentation())
|
||||
.map(backupManager::generateSvrbAuth)
|
||||
.map(credentials -> GetSvrBCredentialsResponse.newBuilder()
|
||||
.setUsername(credentials.username())
|
||||
.setPassword(credentials.password())
|
||||
.build());
|
||||
public GetSvrBCredentialsResponse getSvrBCredentials(final GetSvrBCredentialsRequest request) {
|
||||
final AuthenticatedBackupUser backupUser = authenticateBackupUser(request.getSignedPresentation());
|
||||
final ExternalServiceCredentials credentials = backupManager.generateSvrbAuth(backupUser);
|
||||
return GetSvrBCredentialsResponse.newBuilder()
|
||||
.setUsername(credentials.username())
|
||||
.setPassword(credentials.password())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<GetBackupInfoResponse> getBackupInfo(final GetBackupInfoRequest request) {
|
||||
return Mono.fromFuture(() ->
|
||||
authenticateBackupUser(request.getSignedPresentation()).thenCompose(backupManager::backupInfo))
|
||||
.map(info -> GetBackupInfoResponse.newBuilder()
|
||||
.setBackupName(info.messageBackupKey())
|
||||
.setCdn(info.cdn())
|
||||
.setBackupDir(info.backupSubdir())
|
||||
.setMediaDir(info.mediaSubdir())
|
||||
.setUsedSpace(info.mediaUsedSpace().orElse(0L))
|
||||
.build());
|
||||
public GetBackupInfoResponse getBackupInfo(final GetBackupInfoRequest request) {
|
||||
final AuthenticatedBackupUser backupUser = authenticateBackupUser(request.getSignedPresentation());
|
||||
final BackupManager.BackupInfo info = backupManager.backupInfo(backupUser);
|
||||
return GetBackupInfoResponse.newBuilder()
|
||||
.setBackupName(info.messageBackupKey())
|
||||
.setCdn(info.cdn())
|
||||
.setBackupDir(info.backupSubdir())
|
||||
.setMediaDir(info.mediaSubdir())
|
||||
.setUsedSpace(info.mediaUsedSpace().orElse(0L))
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<RefreshResponse> refresh(final RefreshRequest request) {
|
||||
return Mono.fromFuture(() -> authenticateBackupUser(request.getSignedPresentation())
|
||||
.thenCompose(backupManager::ttlRefresh))
|
||||
.thenReturn(RefreshResponse.getDefaultInstance());
|
||||
public RefreshResponse refresh(final RefreshRequest request) {
|
||||
final AuthenticatedBackupUser backupUser = authenticateBackupUser(request.getSignedPresentation());
|
||||
backupManager.ttlRefresh(backupUser);
|
||||
return RefreshResponse.getDefaultInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<SetPublicKeyResponse> setPublicKey(final SetPublicKeyRequest request) {
|
||||
public SetPublicKeyResponse setPublicKey(final SetPublicKeyRequest request) {
|
||||
final ECPublicKey publicKey = deserialize(ECPublicKey::new, request.getPublicKey().toByteArray());
|
||||
final BackupAuthCredentialPresentation presentation = deserialize(
|
||||
BackupAuthCredentialPresentation::new,
|
||||
request.getSignedPresentation().getPresentation().toByteArray());
|
||||
final byte[] signature = request.getSignedPresentation().getPresentationSignature().toByteArray();
|
||||
|
||||
return Mono.fromFuture(() -> backupManager.setPublicKey(presentation, signature, publicKey))
|
||||
.thenReturn(SetPublicKeyResponse.getDefaultInstance());
|
||||
backupManager.setPublicKey(presentation, signature, publicKey);
|
||||
return SetPublicKeyResponse.getDefaultInstance();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Mono<GetUploadFormResponse> getUploadForm(final GetUploadFormRequest request) {
|
||||
return authenticateBackupUserMono(request.getSignedPresentation())
|
||||
.flatMap(backupUser -> switch (request.getUploadTypeCase()) {
|
||||
case MESSAGES -> {
|
||||
final long uploadLength = request.getMessages().getUploadLength();
|
||||
final boolean oversize = uploadLength > BackupManager.MAX_MESSAGE_BACKUP_OBJECT_SIZE;
|
||||
backupMetrics.updateMessageBackupSizeDistribution(backupUser, oversize, Optional.of(uploadLength));
|
||||
if (oversize) {
|
||||
yield Mono.error(Status.FAILED_PRECONDITION
|
||||
.withDescription("Exceeds max upload length")
|
||||
.asRuntimeException());
|
||||
}
|
||||
public GetUploadFormResponse getUploadForm(final GetUploadFormRequest request) throws RateLimitExceededException {
|
||||
final AuthenticatedBackupUser backupUser = authenticateBackupUser(request.getSignedPresentation());
|
||||
final BackupUploadDescriptor uploadDescriptor = switch (request.getUploadTypeCase()) {
|
||||
case MESSAGES -> {
|
||||
final long uploadLength = request.getMessages().getUploadLength();
|
||||
final boolean oversize = uploadLength > BackupManager.MAX_MESSAGE_BACKUP_OBJECT_SIZE;
|
||||
backupMetrics.updateMessageBackupSizeDistribution(backupUser, oversize, Optional.of(uploadLength));
|
||||
if (oversize) {
|
||||
throw Status.FAILED_PRECONDITION
|
||||
.withDescription("Exceeds max upload length")
|
||||
.asRuntimeException();
|
||||
}
|
||||
|
||||
yield Mono.fromFuture(backupManager.createMessageBackupUploadDescriptor(backupUser));
|
||||
}
|
||||
case MEDIA -> Mono.fromCompletionStage(backupManager.createTemporaryAttachmentUploadDescriptor(backupUser));
|
||||
case UPLOADTYPE_NOT_SET -> Mono.error(Status.INVALID_ARGUMENT
|
||||
.withDescription("Must set upload_type")
|
||||
.asRuntimeException());
|
||||
})
|
||||
.map(uploadDescriptor -> GetUploadFormResponse.newBuilder()
|
||||
.setCdn(uploadDescriptor.cdn())
|
||||
.setKey(uploadDescriptor.key())
|
||||
.setSignedUploadLocation(uploadDescriptor.signedUploadLocation())
|
||||
.putAllHeaders(uploadDescriptor.headers())
|
||||
.build());
|
||||
yield backupManager.createMessageBackupUploadDescriptor(backupUser);
|
||||
}
|
||||
case MEDIA -> backupManager.createTemporaryAttachmentUploadDescriptor(backupUser);
|
||||
case UPLOADTYPE_NOT_SET -> throw Status.INVALID_ARGUMENT
|
||||
.withDescription("Must set upload_type")
|
||||
.asRuntimeException();
|
||||
};
|
||||
return GetUploadFormResponse.newBuilder()
|
||||
.setCdn(uploadDescriptor.cdn())
|
||||
.setKey(uploadDescriptor.key())
|
||||
.setSignedUploadLocation(uploadDescriptor.signedUploadLocation())
|
||||
.putAllHeaders(uploadDescriptor.headers())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<CopyMediaResponse> copyMedia(final CopyMediaRequest request) {
|
||||
return authenticateBackupUserMono(request.getSignedPresentation())
|
||||
public Flow.Publisher<CopyMediaResponse> copyMedia(final CopyMediaRequest request) {
|
||||
final Flux<CopyMediaResponse> flux = Mono
|
||||
.fromFuture(() -> authenticateBackupUserAsync(request.getSignedPresentation()))
|
||||
.flatMapMany(backupUser -> backupManager.copyToBackup(backupUser,
|
||||
request.getItemsList().stream().map(item -> new CopyParameters(
|
||||
item.getSourceAttachmentCdn(), item.getSourceKey(),
|
||||
@@ -167,46 +168,43 @@ public class BackupsAnonymousGrpcService extends ReactorBackupsAnonymousGrpc.Bac
|
||||
};
|
||||
return builder.build();
|
||||
});
|
||||
|
||||
return JdkFlowAdapter.publisherToFlowPublisher(flux);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ListMediaResponse> listMedia(final ListMediaRequest request) {
|
||||
return authenticateBackupUserMono(request.getSignedPresentation()).zipWhen(
|
||||
backupUser -> Mono.fromFuture(backupManager.list(
|
||||
backupUser,
|
||||
request.hasCursor() ? Optional.of(request.getCursor()) : Optional.empty(),
|
||||
request.getLimit()).toCompletableFuture()),
|
||||
|
||||
(backupUser, listResult) -> {
|
||||
final ListMediaResponse.Builder builder = ListMediaResponse.newBuilder();
|
||||
for (BackupManager.StorageDescriptorWithLength sd : listResult.media()) {
|
||||
builder.addPage(ListMediaResponse.ListEntry.newBuilder()
|
||||
.setMediaId(ByteString.copyFrom(sd.key()))
|
||||
.setCdn(sd.cdn())
|
||||
.setLength(sd.length())
|
||||
.build());
|
||||
}
|
||||
builder
|
||||
.setBackupDir(backupUser.backupDir())
|
||||
.setMediaDir(backupUser.mediaDir());
|
||||
listResult.cursor().ifPresent(builder::setCursor);
|
||||
return builder.build();
|
||||
});
|
||||
public ListMediaResponse listMedia(final ListMediaRequest request) {
|
||||
final AuthenticatedBackupUser backupUser = authenticateBackupUser(request.getSignedPresentation());
|
||||
final BackupManager.ListMediaResult listResult = backupManager.list(
|
||||
backupUser,
|
||||
request.hasCursor() ? Optional.of(request.getCursor()) : Optional.empty(),
|
||||
request.getLimit());
|
||||
|
||||
final ListMediaResponse.Builder builder = ListMediaResponse.newBuilder();
|
||||
for (BackupManager.StorageDescriptorWithLength sd : listResult.media()) {
|
||||
builder.addPage(ListMediaResponse.ListEntry.newBuilder()
|
||||
.setMediaId(ByteString.copyFrom(sd.key()))
|
||||
.setCdn(sd.cdn())
|
||||
.setLength(sd.length())
|
||||
.build());
|
||||
}
|
||||
builder
|
||||
.setBackupDir(backupUser.backupDir())
|
||||
.setMediaDir(backupUser.mediaDir());
|
||||
listResult.cursor().ifPresent(builder::setCursor);
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<DeleteAllResponse> deleteAll(final DeleteAllRequest request) {
|
||||
return Mono.fromFuture(() -> authenticateBackupUser(request.getSignedPresentation())
|
||||
.thenCompose(backupManager::deleteEntireBackup))
|
||||
.thenReturn(DeleteAllResponse.getDefaultInstance());
|
||||
public DeleteAllResponse deleteAll(final DeleteAllRequest request) {
|
||||
final AuthenticatedBackupUser backupUser = authenticateBackupUser(request.getSignedPresentation());
|
||||
backupManager.deleteEntireBackup(backupUser);
|
||||
return DeleteAllResponse.getDefaultInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<DeleteMediaResponse> deleteMedia(final DeleteMediaRequest request) {
|
||||
return Mono
|
||||
.fromFuture(() -> authenticateBackupUser(request.getSignedPresentation()))
|
||||
public Flow.Publisher<DeleteMediaResponse> deleteMedia(final DeleteMediaRequest request) {
|
||||
return JdkFlowAdapter.publisherToFlowPublisher(Mono
|
||||
.fromFuture(() -> authenticateBackupUserAsync(request.getSignedPresentation()))
|
||||
.flatMapMany(backupUser -> backupManager.deleteMedia(backupUser, request
|
||||
.getItemsList()
|
||||
.stream()
|
||||
@@ -214,20 +212,15 @@ public class BackupsAnonymousGrpcService extends ReactorBackupsAnonymousGrpc.Bac
|
||||
.toList()))
|
||||
.map(storageDescriptor -> DeleteMediaResponse.newBuilder()
|
||||
.setMediaId(ByteString.copyFrom(storageDescriptor.key()))
|
||||
.setCdn(storageDescriptor.cdn()).build());
|
||||
.setCdn(storageDescriptor.cdn()).build()));
|
||||
}
|
||||
|
||||
private Mono<AuthenticatedBackupUser> authenticateBackupUserMono(final SignedPresentation signedPresentation) {
|
||||
return Mono.fromFuture(() -> authenticateBackupUser(signedPresentation));
|
||||
}
|
||||
|
||||
private CompletableFuture<AuthenticatedBackupUser> authenticateBackupUser(
|
||||
final SignedPresentation signedPresentation) {
|
||||
private CompletableFuture<AuthenticatedBackupUser> authenticateBackupUserAsync(final SignedPresentation signedPresentation) {
|
||||
if (signedPresentation == null) {
|
||||
throw Status.UNAUTHENTICATED.asRuntimeException();
|
||||
}
|
||||
try {
|
||||
return backupManager.authenticateBackupUser(
|
||||
return backupManager.authenticateBackupUserAsync(
|
||||
new BackupAuthCredentialPresentation(signedPresentation.getPresentation().toByteArray()),
|
||||
signedPresentation.getPresentationSignature().toByteArray(),
|
||||
RequestAttributesUtil.getUserAgent().orElse(null));
|
||||
@@ -236,6 +229,10 @@ public class BackupsAnonymousGrpcService extends ReactorBackupsAnonymousGrpc.Bac
|
||||
}
|
||||
}
|
||||
|
||||
private AuthenticatedBackupUser authenticateBackupUser(final SignedPresentation signedPresentation) {
|
||||
return authenticateBackupUserAsync(signedPresentation).join();
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert an int from a proto uint32 to a signed positive integer, throwing if the value exceeds
|
||||
* {@link Integer#MAX_VALUE}. To convert to a long, see {@link Integer#toUnsignedLong(int)}
|
||||
|
||||
@@ -14,11 +14,11 @@ import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
import org.signal.chat.backup.GetBackupAuthCredentialsRequest;
|
||||
import org.signal.chat.backup.GetBackupAuthCredentialsResponse;
|
||||
import org.signal.chat.backup.ReactorBackupsGrpc;
|
||||
import org.signal.chat.backup.RedeemReceiptRequest;
|
||||
import org.signal.chat.backup.RedeemReceiptResponse;
|
||||
import org.signal.chat.backup.SetBackupIdRequest;
|
||||
import org.signal.chat.backup.SetBackupIdResponse;
|
||||
import org.signal.chat.backup.SimpleBackupsGrpc;
|
||||
import org.signal.chat.common.ZkCredential;
|
||||
import org.signal.libsignal.zkgroup.InvalidInputException;
|
||||
import org.signal.libsignal.zkgroup.backups.BackupAuthCredentialRequest;
|
||||
@@ -28,14 +28,14 @@ import org.whispersystems.textsecuregcm.auth.RedemptionRange;
|
||||
import org.whispersystems.textsecuregcm.auth.grpc.AuthenticatedDevice;
|
||||
import org.whispersystems.textsecuregcm.auth.grpc.AuthenticationUtil;
|
||||
import org.whispersystems.textsecuregcm.backup.BackupAuthManager;
|
||||
import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException;
|
||||
import org.whispersystems.textsecuregcm.metrics.BackupMetrics;
|
||||
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public class BackupsGrpcService extends ReactorBackupsGrpc.BackupsImplBase {
|
||||
public class BackupsGrpcService extends SimpleBackupsGrpc.BackupsImplBase {
|
||||
|
||||
private final AccountsManager accountManager;
|
||||
private final BackupAuthManager backupAuthManager;
|
||||
@@ -48,7 +48,7 @@ public class BackupsGrpcService extends ReactorBackupsGrpc.BackupsImplBase {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<SetBackupIdResponse> setBackupId(SetBackupIdRequest request) {
|
||||
public SetBackupIdResponse setBackupId(SetBackupIdRequest request) throws RateLimitExceededException {
|
||||
|
||||
final Optional<BackupAuthCredentialRequest> messagesCredentialRequest = deserializeWithEmptyPresenceCheck(
|
||||
BackupAuthCredentialRequest::new,
|
||||
@@ -59,28 +59,25 @@ public class BackupsGrpcService extends ReactorBackupsGrpc.BackupsImplBase {
|
||||
request.getMediaBackupAuthCredentialRequest());
|
||||
|
||||
final AuthenticatedDevice authenticatedDevice = AuthenticationUtil.requireAuthenticatedDevice();
|
||||
return authenticatedAccount()
|
||||
.flatMap(account -> {
|
||||
final Device device = account
|
||||
.getDevice(authenticatedDevice.deviceId())
|
||||
.orElseThrow(Status.UNAUTHENTICATED::asRuntimeException);
|
||||
return Mono.fromFuture(
|
||||
backupAuthManager.commitBackupId(account, device, messagesCredentialRequest, mediaCredentialRequest));
|
||||
})
|
||||
.thenReturn(SetBackupIdResponse.getDefaultInstance());
|
||||
final Account account = authenticatedAccount();
|
||||
final Device device = account
|
||||
.getDevice(authenticatedDevice.deviceId())
|
||||
.orElseThrow(Status.UNAUTHENTICATED::asRuntimeException);
|
||||
backupAuthManager.commitBackupId(account, device, messagesCredentialRequest, mediaCredentialRequest);
|
||||
return SetBackupIdResponse.getDefaultInstance();
|
||||
}
|
||||
|
||||
public Mono<RedeemReceiptResponse> redeemReceipt(RedeemReceiptRequest request) {
|
||||
public RedeemReceiptResponse redeemReceipt(RedeemReceiptRequest request) {
|
||||
final ReceiptCredentialPresentation receiptCredentialPresentation = deserialize(
|
||||
ReceiptCredentialPresentation::new,
|
||||
request.getPresentation().toByteArray());
|
||||
return authenticatedAccount()
|
||||
.flatMap(account -> Mono.fromFuture(backupAuthManager.redeemReceipt(account, receiptCredentialPresentation)))
|
||||
.thenReturn(RedeemReceiptResponse.getDefaultInstance());
|
||||
final Account account = authenticatedAccount();
|
||||
backupAuthManager.redeemReceipt(account, receiptCredentialPresentation);
|
||||
return RedeemReceiptResponse.getDefaultInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<GetBackupAuthCredentialsResponse> getBackupAuthCredentials(GetBackupAuthCredentialsRequest request) {
|
||||
public GetBackupAuthCredentialsResponse getBackupAuthCredentials(GetBackupAuthCredentialsRequest request) {
|
||||
final Tag platformTag = UserAgentTagUtil.getPlatformTag(RequestAttributesUtil.getUserAgent().orElse(null));
|
||||
final RedemptionRange redemptionRange;
|
||||
try {
|
||||
@@ -90,46 +87,41 @@ public class BackupsGrpcService extends ReactorBackupsGrpc.BackupsImplBase {
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw Status.INVALID_ARGUMENT.withDescription(e.getMessage()).asRuntimeException();
|
||||
}
|
||||
return authenticatedAccount().flatMap(account -> {
|
||||
final Mono<List<BackupAuthManager.Credential>> messageCredentials = Mono.fromCompletionStage(() ->
|
||||
backupAuthManager.getBackupAuthCredentials(
|
||||
account,
|
||||
BackupCredentialType.MESSAGES,
|
||||
redemptionRange))
|
||||
.doOnSuccess(credentials ->
|
||||
backupMetrics.updateGetCredentialCounter(platformTag, BackupCredentialType.MESSAGES, credentials.size()));
|
||||
final Account account = authenticatedAccount();
|
||||
final List<BackupAuthManager.Credential> messageCredentials =
|
||||
backupAuthManager.getBackupAuthCredentials(
|
||||
account,
|
||||
BackupCredentialType.MESSAGES,
|
||||
redemptionRange);
|
||||
backupMetrics.updateGetCredentialCounter(platformTag, BackupCredentialType.MESSAGES, messageCredentials.size());
|
||||
|
||||
final Mono<List<BackupAuthManager.Credential>> mediaCredentials = Mono.fromCompletionStage(() ->
|
||||
backupAuthManager.getBackupAuthCredentials(
|
||||
account,
|
||||
BackupCredentialType.MEDIA,
|
||||
redemptionRange))
|
||||
.doOnSuccess(credentials ->
|
||||
backupMetrics.updateGetCredentialCounter(platformTag, BackupCredentialType.MEDIA, credentials.size()));
|
||||
final List<BackupAuthManager.Credential> mediaCredentials =
|
||||
backupAuthManager.getBackupAuthCredentials(
|
||||
account,
|
||||
BackupCredentialType.MEDIA,
|
||||
redemptionRange);
|
||||
backupMetrics.updateGetCredentialCounter(platformTag, BackupCredentialType.MEDIA, mediaCredentials.size());
|
||||
|
||||
return messageCredentials.zipWith(mediaCredentials, (messageCreds, mediaCreds) ->
|
||||
GetBackupAuthCredentialsResponse.newBuilder()
|
||||
.putAllMessageCredentials(messageCreds.stream().collect(Collectors.toMap(
|
||||
c -> c.redemptionTime().getEpochSecond(),
|
||||
c -> ZkCredential.newBuilder()
|
||||
.setCredential(ByteString.copyFrom(c.credential().serialize()))
|
||||
.setRedemptionTime(c.redemptionTime().getEpochSecond())
|
||||
.build())))
|
||||
.putAllMediaCredentials(mediaCreds.stream().collect(Collectors.toMap(
|
||||
c -> c.redemptionTime().getEpochSecond(),
|
||||
c -> ZkCredential.newBuilder()
|
||||
.setCredential(ByteString.copyFrom(c.credential().serialize()))
|
||||
.setRedemptionTime(c.redemptionTime().getEpochSecond())
|
||||
.build())))
|
||||
.build());
|
||||
});
|
||||
return GetBackupAuthCredentialsResponse.newBuilder()
|
||||
.putAllMessageCredentials(messageCredentials.stream().collect(Collectors.toMap(
|
||||
c -> c.redemptionTime().getEpochSecond(),
|
||||
c -> ZkCredential.newBuilder()
|
||||
.setCredential(ByteString.copyFrom(c.credential().serialize()))
|
||||
.setRedemptionTime(c.redemptionTime().getEpochSecond())
|
||||
.build())))
|
||||
.putAllMediaCredentials(mediaCredentials.stream().collect(Collectors.toMap(
|
||||
c -> c.redemptionTime().getEpochSecond(),
|
||||
c -> ZkCredential.newBuilder()
|
||||
.setCredential(ByteString.copyFrom(c.credential().serialize()))
|
||||
.setRedemptionTime(c.redemptionTime().getEpochSecond())
|
||||
.build())))
|
||||
.build();
|
||||
}
|
||||
|
||||
private Mono<Account> authenticatedAccount() {
|
||||
final AuthenticatedDevice authenticatedDevice = AuthenticationUtil.requireAuthenticatedDevice();
|
||||
return Mono
|
||||
.fromFuture(() -> accountManager.getByAccountIdentifierAsync(authenticatedDevice.accountIdentifier()))
|
||||
.map(maybeAccount -> maybeAccount.orElseThrow(Status.UNAUTHENTICATED::asRuntimeException));
|
||||
private Account authenticatedAccount() {
|
||||
return accountManager
|
||||
.getByAccountIdentifier(AuthenticationUtil.requireAuthenticatedDevice().accountIdentifier())
|
||||
.orElseThrow(Status.UNAUTHENTICATED::asRuntimeException);
|
||||
}
|
||||
|
||||
private interface Deserializer<T> {
|
||||
|
||||
@@ -14,6 +14,7 @@ import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
|
||||
@@ -46,7 +47,9 @@ public class ErrorMappingInterceptor implements ServerInterceptor {
|
||||
return;
|
||||
}
|
||||
|
||||
final StatusRuntimeException statusException = switch (status.getCause()) {
|
||||
final Throwable cause = ExceptionUtils.unwrap(status.getCause());
|
||||
|
||||
final StatusRuntimeException statusException = switch (cause) {
|
||||
case ConvertibleToGrpcStatus e -> e.toStatusRuntimeException();
|
||||
case UncheckedIOException e -> {
|
||||
log.warn("RPC {} encountered UncheckedIOException", call.getMethodDescriptor().getFullMethodName(), e.getCause());
|
||||
|
||||
@@ -2,6 +2,7 @@ package org.whispersystems.textsecuregcm.util;
|
||||
|
||||
import java.util.concurrent.CompletionException;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public final class ExceptionUtils {
|
||||
|
||||
@@ -81,4 +82,49 @@ public final class ExceptionUtils {
|
||||
throw wrap(fn.apply(e));
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs the supplier, throwing a checked exception if the supplier throws an exception that unwraps to the provided type
|
||||
*
|
||||
* @param exType The exception type to check for
|
||||
* @param supplier A supplier that produces a T
|
||||
* @return The result of the supplier
|
||||
* @param <T> The supplier type
|
||||
* @param <E> The checked exception type
|
||||
* @throws E If the supplier throws E or a type that {@link #unwrap}s to E
|
||||
*/
|
||||
public static <T, E extends Throwable> T unwrapSupply(Class<E> exType, Supplier<T> supplier) throws E {
|
||||
try {
|
||||
return supplier.get();
|
||||
} catch (RuntimeException e) {
|
||||
final Throwable ex = unwrap(e);
|
||||
if (exType.isInstance(ex)) {
|
||||
throw exType.cast(ex);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs the supplier, throwing a checked exception if the supplier throws an exception that unwraps to the provided type
|
||||
*
|
||||
* @param exType The exception type to check for
|
||||
* @param supplier A supplier that produces a T
|
||||
* @param marshal A function that maps from the thrown type to another exception type
|
||||
* @return The result of the supplier
|
||||
* @param <T> The supplier type
|
||||
* @param <E> The checked exception type that may be thrown from supplier
|
||||
* @throws F If the supplier throws E or a type that {@link #unwrap}s to E
|
||||
*/
|
||||
public static <T, E extends Throwable, F extends Throwable> T unwrapSupply(Class<E> exType, Supplier<T> supplier, Function<E, F> marshal) throws F {
|
||||
try {
|
||||
return supplier.get();
|
||||
} catch (RuntimeException e) {
|
||||
final Throwable ex = unwrap(e);
|
||||
if (exType.isInstance(ex)) {
|
||||
throw marshal.apply(exType.cast(ex));
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user