Make copy/delete streaming friendly

This commit is contained in:
ravi-signal
2024-06-20 16:00:09 -05:00
committed by GitHub
parent c27898a993
commit 4aadabfac0
6 changed files with 436 additions and 321 deletions

View File

@@ -27,8 +27,8 @@ import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Stream;
import javax.validation.Valid;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
@@ -54,22 +54,19 @@ import org.signal.libsignal.zkgroup.backups.BackupAuthCredentialPresentation;
import org.signal.libsignal.zkgroup.backups.BackupAuthCredentialRequest;
import org.signal.libsignal.zkgroup.receipts.ReceiptCredentialPresentation;
import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount;
import org.whispersystems.textsecuregcm.auth.AuthenticatedBackupUser;
import org.whispersystems.textsecuregcm.backup.BackupAuthManager;
import org.whispersystems.textsecuregcm.backup.BackupManager;
import org.whispersystems.textsecuregcm.backup.InvalidLengthException;
import org.whispersystems.textsecuregcm.backup.CopyParameters;
import org.whispersystems.textsecuregcm.backup.CopyResult;
import org.whispersystems.textsecuregcm.backup.MediaEncryptionParameters;
import org.whispersystems.textsecuregcm.backup.SourceObjectNotFoundException;
import org.whispersystems.textsecuregcm.util.BackupAuthCredentialAdapter;
import org.whispersystems.textsecuregcm.util.ByteArrayAdapter;
import org.whispersystems.textsecuregcm.util.ByteArrayBase64UrlAdapter;
import org.whispersystems.textsecuregcm.util.ECPublicKeyAdapter;
import org.whispersystems.textsecuregcm.util.ExactlySize;
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
import org.whispersystems.textsecuregcm.util.Util;
import org.whispersystems.websocket.auth.Mutable;
import org.whispersystems.websocket.auth.ReadOnly;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Path("/v1/archives")
@@ -124,7 +121,7 @@ public class ArchiveController {
public record RedeemBackupReceiptRequest(
@Schema(description = "Presentation of a ZK receipt encoded in standard padded base64", implementation = String.class)
@JsonDeserialize(using = RedeemBackupReceiptRequest.Deserializer.class)
@JsonDeserialize(using = Deserializer.class)
@NotNull
ReceiptCredentialPresentation receiptCredentialPresentation) {
@@ -503,7 +500,16 @@ public class ArchiveController {
@JsonDeserialize(using = ByteArrayAdapter.Deserializing.class)
@NotNull
@ExactlySize(16)
byte[] iv) {}
byte[] iv) {
CopyParameters toCopyParameters() {
return new CopyParameters(
sourceAttachment.cdn, sourceAttachment.key,
objectLength,
new MediaEncryptionParameters(encryptionKey, hmacKey, iv),
mediaId);
}
}
public record CopyMediaResponse(
@Schema(description = "The backup cdn where this media object is stored")
@@ -547,48 +553,20 @@ public class ArchiveController {
throw new BadRequestException("must not use authenticated connection for anonymous operations");
}
return backupManager
.authenticateBackupUser(presentation.presentation, signature.signature)
.thenCompose(backupUser -> checkMediaFits(backupUser, copyMediaRequest.objectLength)
.thenCompose(ignored -> copyMediaImpl(backupUser, copyMediaRequest)))
.thenApply(result -> new CopyMediaResponse(result.cdn()))
.exceptionally(e -> {
final Throwable unwrapped = ExceptionUtils.unwrap(e);
if (unwrapped instanceof SourceObjectNotFoundException) {
throw new ClientErrorException("Source object not found " + unwrapped.getMessage(), Response.Status.GONE);
} else if (unwrapped instanceof InvalidLengthException) {
throw new BadRequestException("Invalid length " + unwrapped.getMessage());
} else {
throw ExceptionUtils.wrap(e);
}
});
return Mono
.fromFuture(backupManager.authenticateBackupUser(presentation.presentation, signature.signature))
.flatMap(backupUser -> backupManager.copyToBackup(backupUser, List.of(copyMediaRequest.toCopyParameters()))
.next()
.map(copyResult -> switch (copyResult.outcome()) {
case SUCCESS -> new CopyMediaResponse(copyResult.cdn());
case SOURCE_WRONG_LENGTH -> throw new BadRequestException("Invalid length");
case SOURCE_NOT_FOUND -> throw new ClientErrorException("Source object not found", Response.Status.GONE);
case OUT_OF_QUOTA ->
throw new ClientErrorException("Media quota exhausted", Response.Status.REQUEST_ENTITY_TOO_LARGE);
}))
.toFuture();
}
private CompletableFuture<Void> checkMediaFits(AuthenticatedBackupUser backupUser, long amountToStore) {
return backupManager.canStoreMedia(backupUser, amountToStore)
.thenApply(fits -> {
if (!fits) {
throw new ClientErrorException("Media quota exhausted", Response.Status.REQUEST_ENTITY_TOO_LARGE);
}
return null;
});
}
private CompletionStage<BackupManager.StorageDescriptor> copyMediaImpl(final AuthenticatedBackupUser backupUser,
final CopyMediaRequest copyMediaRequest) {
return this.backupManager.copyToBackup(
backupUser,
copyMediaRequest.sourceAttachment.cdn,
copyMediaRequest.sourceAttachment.key,
copyMediaRequest.objectLength,
new MediaEncryptionParameters(
copyMediaRequest.encryptionKey,
copyMediaRequest.hmacKey,
copyMediaRequest.iv),
copyMediaRequest.mediaId);
}
public record CopyMediaBatchRequest(
@Schema(description = "A list of media objects to copy from the attachments CDN to the backup CDN")
@NotNull
@@ -606,6 +584,7 @@ public class ArchiveController {
A 200 indicates the object was successfully copied.
A 400 indicates an invalid argument in the request
A 410 indicates that the source object was not found
A 413 indicates that the media quota was exhausted
""")
int status,
@@ -620,7 +599,17 @@ public class ArchiveController {
@JsonDeserialize(using = ByteArrayBase64UrlAdapter.Deserializing.class)
@NotNull
@ExactlySize(15)
byte[] mediaId) {}
byte[] mediaId) {
static Entry fromCopyResult(final CopyResult copyResult) {
return switch (copyResult.outcome()) {
case SUCCESS -> new Entry(200, null, copyResult.cdn(), copyResult.mediaId());
case SOURCE_WRONG_LENGTH -> new Entry(400, "Invalid source length", null, copyResult.mediaId());
case SOURCE_NOT_FOUND -> new Entry(410, "Source not found", null, copyResult.mediaId());
case OUT_OF_QUOTA -> new Entry(413, "Media quota exhausted", null, copyResult.mediaId());
};
}
}
}
@PUT
@@ -661,37 +650,13 @@ public class ArchiveController {
if (account.isPresent()) {
throw new BadRequestException("must not use authenticated connection for anonymous operations");
}
// If the entire batch won't fit in the user's remaining quota, reject the whole request.
final long expectedStorage = copyMediaRequest.items().stream().mapToLong(CopyMediaRequest::objectLength).sum();
return backupManager.authenticateBackupUser(presentation.presentation, signature.signature)
.thenCompose(backupUser -> checkMediaFits(backupUser, expectedStorage).thenCompose(
ignored -> Flux.fromIterable(copyMediaRequest.items)
// Operate sequentially, waiting for one copy to finish before starting the next one. At least right now,
// copying concurrently will introduce contention over the metadata.
.concatMap(request -> Mono
.fromCompletionStage(copyMediaImpl(backupUser, request))
.map(result -> new CopyMediaBatchResponse.Entry(200, null, result.cdn(), result.key()))
.onErrorResume(throwable -> ExceptionUtils.unwrap(throwable) instanceof IOException, throwable -> {
final Throwable unwrapped = ExceptionUtils.unwrap(throwable);
int status;
String error;
if (unwrapped instanceof SourceObjectNotFoundException) {
status = 410;
error = "Source object not found " + unwrapped.getMessage();
} else if (unwrapped instanceof InvalidLengthException) {
status = 400;
error = "Invalid length " + unwrapped.getMessage();
} else {
throw ExceptionUtils.wrap(throwable);
}
return Mono.just(new CopyMediaBatchResponse.Entry(status, error, null, request.mediaId));
}))
.collectList()
.map(list -> Response.status(207).entity(new CopyMediaBatchResponse(list)).build())
.toFuture()));
final Stream<CopyParameters> copyParams = copyMediaRequest.items().stream().map(CopyMediaRequest::toCopyParameters);
return Mono.fromFuture(backupManager.authenticateBackupUser(presentation.presentation, signature.signature))
.flatMapMany(backupUser -> backupManager.copyToBackup(backupUser, copyParams.toList()))
.map(CopyMediaBatchResponse.Entry::fromCopyResult)
.collectList()
.map(list -> Response.status(207).entity(new CopyMediaBatchResponse(list)).build())
.toFuture();
}
@POST
@@ -842,12 +807,15 @@ public class ArchiveController {
throw new BadRequestException("must not use authenticated connection for anonymous operations");
}
final List<BackupManager.StorageDescriptor> toDelete = deleteMedia.mediaToDelete().stream()
.map(media -> new BackupManager.StorageDescriptor(media.cdn(), media.mediaId))
.toList();
return backupManager
.authenticateBackupUser(presentation.presentation, signature.signature)
.thenCompose(authenticatedBackupUser -> backupManager.delete(authenticatedBackupUser,
deleteMedia.mediaToDelete().stream()
.map(media -> new BackupManager.StorageDescriptor(media.cdn(), media.mediaId))
.toList()))
.thenCompose(authenticatedBackupUser -> backupManager
.deleteMedia(authenticatedBackupUser, toDelete)
.then().toFuture())
.thenApply(Util.ASYNC_EMPTY_RESPONSE);
}