Add new upload-for-copy backup endpoint

This commit is contained in:
ravi-signal
2024-04-15 13:47:46 -05:00
committed by GitHub
parent e5d654f0c7
commit d36df3eaa9
13 changed files with 202 additions and 63 deletions

View File

@@ -709,6 +709,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
ServerZkAuthOperations zkAuthOperations = new ServerZkAuthOperations(zkSecretParams);
ServerZkReceiptOperations zkReceiptOperations = new ServerZkReceiptOperations(zkSecretParams);
TusAttachmentGenerator tusAttachmentGenerator = new TusAttachmentGenerator(config.getTus());
Cdn3BackupCredentialGenerator cdn3BackupCredentialGenerator = new Cdn3BackupCredentialGenerator(config.getTus());
BackupAuthManager backupAuthManager = new BackupAuthManager(experimentEnrollmentManager, rateLimiters,
accountsManager, zkReceiptOperations, redeemedReceiptsManager, backupsGenericZkSecretParams, clock);
@@ -719,6 +720,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
BackupManager backupManager = new BackupManager(
backupsDb,
backupsGenericZkSecretParams,
rateLimiters,
tusAttachmentGenerator,
cdn3BackupCredentialGenerator,
new Cdn3RemoteStorageManager(
remoteStorageExecutor,
@@ -947,7 +950,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
config.getAwsAttachmentsConfiguration().accessSecret().value(),
config.getAwsAttachmentsConfiguration().region(), config.getAwsAttachmentsConfiguration().bucket()),
new AttachmentControllerV3(rateLimiters, gcsAttachmentGenerator),
new AttachmentControllerV4(rateLimiters, gcsAttachmentGenerator, new TusAttachmentGenerator(config.getTus()),
new AttachmentControllerV4(rateLimiters, gcsAttachmentGenerator, tusAttachmentGenerator,
experimentEnrollmentManager),
new ArchiveController(backupAuthManager, backupManager),
new CallRoutingController(rateLimiters, callRouter, turnTokenGenerator),

View File

@@ -11,6 +11,7 @@ import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Metrics;
import java.io.IOException;
import java.net.URI;
import java.security.SecureRandom;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
@@ -30,7 +31,12 @@ import org.signal.libsignal.zkgroup.VerificationFailedException;
import org.signal.libsignal.zkgroup.backups.BackupAuthCredentialPresentation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.attachments.AttachmentGenerator;
import org.whispersystems.textsecuregcm.attachments.TusAttachmentGenerator;
import org.whispersystems.textsecuregcm.auth.AuthenticatedBackupUser;
import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException;
import org.whispersystems.textsecuregcm.limits.RateLimiter;
import org.whispersystems.textsecuregcm.limits.RateLimiters;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
import reactor.core.publisher.Flux;
@@ -59,21 +65,28 @@ public class BackupManager {
private final BackupsDb backupsDb;
private final GenericServerSecretParams serverSecretParams;
private final RateLimiters rateLimiters;
private final TusAttachmentGenerator tusAttachmentGenerator;
private final Cdn3BackupCredentialGenerator cdn3BackupCredentialGenerator;
private final RemoteStorageManager remoteStorageManager;
private final Map<Integer, String> attachmentCdnBaseUris;
private final SecureRandom secureRandom = new SecureRandom();
private final Clock clock;
public BackupManager(
final BackupsDb backupsDb,
final GenericServerSecretParams serverSecretParams,
final RateLimiters rateLimiters,
final TusAttachmentGenerator tusAttachmentGenerator,
final Cdn3BackupCredentialGenerator cdn3BackupCredentialGenerator,
final RemoteStorageManager remoteStorageManager,
final Map<Integer, String> attachmentCdnBaseUris,
final Clock clock) {
this.backupsDb = backupsDb;
this.serverSecretParams = serverSecretParams;
this.rateLimiters = rateLimiters;
this.tusAttachmentGenerator = tusAttachmentGenerator;
this.cdn3BackupCredentialGenerator = cdn3BackupCredentialGenerator;
this.remoteStorageManager = remoteStorageManager;
this.clock = clock;
@@ -131,26 +144,38 @@ public class BackupManager {
* @param backupUser an already ZK authenticated backup user
* @return the upload form
*/
public CompletableFuture<MessageBackupUploadDescriptor> createMessageBackupUploadDescriptor(
public CompletableFuture<BackupUploadDescriptor> createMessageBackupUploadDescriptor(
final AuthenticatedBackupUser backupUser) {
checkBackupTier(backupUser, BackupTier.MESSAGES);
// this could race with concurrent updates, but the only effect would be last-writer-wins on the timestamp
return backupsDb
.addMessageBackup(backupUser)
.thenApply(result -> cdn3BackupCredentialGenerator.generateUpload(cdnMessageBackupName(backupUser)));
}
public BackupUploadDescriptor createTemporaryAttachmentUploadDescriptor(final AuthenticatedBackupUser backupUser)
throws RateLimitExceededException {
checkBackupTier(backupUser, BackupTier.MEDIA);
RateLimiter.adaptLegacyException(() -> rateLimiters
.forDescriptor(RateLimiters.For.BACKUP_ATTACHMENT)
.validate(rateLimitKey(backupUser)));
final byte[] bytes = new byte[15];
secureRandom.nextBytes(bytes);
final String attachmentKey = Base64.getUrlEncoder().encodeToString(bytes);
final AttachmentGenerator.Descriptor descriptor = tusAttachmentGenerator.generateAttachment(attachmentKey);
return new BackupUploadDescriptor(3, attachmentKey, descriptor.headers(), descriptor.signedUploadLocation());
}
/**
* Update the last update timestamps for the backupId in the presentation
*
* @param backupUser an already ZK authenticated backup user
*/
public CompletableFuture<Void> ttlRefresh(final AuthenticatedBackupUser backupUser) {
if (backupUser.backupTier().compareTo(BackupTier.MESSAGES) < 0) {
Metrics.counter(ZK_AUTHZ_FAILURE_COUNTER_NAME).increment();
throw Status.PERMISSION_DENIED
.withDescription("credential does not support ttl operation")
.asRuntimeException();
}
checkBackupTier(backupUser, BackupTier.MESSAGES);
// update message backup TTL
return backupsDb.ttlRefresh(backupUser);
}
@@ -165,11 +190,7 @@ public class BackupManager {
* @return Information about the existing backup
*/
public CompletableFuture<BackupInfo> backupInfo(final AuthenticatedBackupUser backupUser) {
if (backupUser.backupTier().compareTo(BackupTier.MESSAGES) < 0) {
Metrics.counter(ZK_AUTHZ_FAILURE_COUNTER_NAME).increment();
throw Status.PERMISSION_DENIED.withDescription("credential does not support info operation")
.asRuntimeException();
}
checkBackupTier(backupUser, BackupTier.MESSAGES);
return backupsDb.describeBackup(backupUser)
.thenApply(backupDescription -> new BackupInfo(
backupDescription.cdn(),
@@ -187,12 +208,7 @@ public class BackupManager {
* @return true if mediaLength bytes can be stored
*/
public CompletableFuture<Boolean> canStoreMedia(final AuthenticatedBackupUser backupUser, final long mediaLength) {
if (backupUser.backupTier().compareTo(BackupTier.MEDIA) < 0) {
Metrics.counter(ZK_AUTHZ_FAILURE_COUNTER_NAME).increment();
throw Status.PERMISSION_DENIED
.withDescription("credential does not support storing media")
.asRuntimeException();
}
checkBackupTier(backupUser, BackupTier.MEDIA);
return backupsDb.getMediaUsage(backupUser)
.thenComposeAsync(info -> {
final boolean canStore = MAX_TOTAL_BACKUP_MEDIA_BYTES - info.usageInfo().bytesUsed() >= mediaLength;
@@ -243,12 +259,7 @@ public class BackupManager {
final int sourceLength,
final MediaEncryptionParameters encryptionParameters,
final byte[] destinationMediaId) {
if (backupUser.backupTier().compareTo(BackupTier.MEDIA) < 0) {
Metrics.counter(ZK_AUTHZ_FAILURE_COUNTER_NAME).increment();
throw Status.PERMISSION_DENIED
.withDescription("credential does not support storing media")
.asRuntimeException();
}
checkBackupTier(backupUser, BackupTier.MEDIA);
if (sourceLength > MAX_MEDIA_OBJECT_SIZE) {
throw Status.INVALID_ARGUMENT
.withDescription("Invalid sourceObject size")
@@ -262,7 +273,7 @@ public class BackupManager {
return CompletableFuture.failedFuture(e);
}
final MessageBackupUploadDescriptor dst = cdn3BackupCredentialGenerator.generateUpload(
final BackupUploadDescriptor dst = cdn3BackupCredentialGenerator.generateUpload(
cdnMediaPath(backupUser, destinationMediaId));
final int destinationLength = encryptionParameters.outputSize(sourceLength);
@@ -309,12 +320,7 @@ public class BackupManager {
* @return A map of headers to include with CDN requests
*/
public Map<String, String> generateReadAuth(final AuthenticatedBackupUser backupUser) {
if (backupUser.backupTier().compareTo(BackupTier.MESSAGES) < 0) {
Metrics.counter(ZK_AUTHZ_FAILURE_COUNTER_NAME).increment();
throw Status.PERMISSION_DENIED
.withDescription("credential does not support read auth operation")
.asRuntimeException();
}
checkBackupTier(backupUser, BackupTier.MESSAGES);
return cdn3BackupCredentialGenerator.readHeaders(backupUser.backupDir());
}
@@ -339,12 +345,7 @@ public class BackupManager {
final AuthenticatedBackupUser backupUser,
final Optional<String> cursor,
final int limit) {
if (backupUser.backupTier().compareTo(BackupTier.MESSAGES) < 0) {
Metrics.counter(ZK_AUTHZ_FAILURE_COUNTER_NAME).increment();
throw Status.PERMISSION_DENIED
.withDescription("credential does not support list operation")
.asRuntimeException();
}
checkBackupTier(backupUser, BackupTier.MESSAGES);
return remoteStorageManager.list(cdnMediaDirectory(backupUser), cursor, limit)
.thenApply(result ->
new ListMediaResult(
@@ -370,12 +371,7 @@ public class BackupManager {
public CompletableFuture<Void> delete(final AuthenticatedBackupUser backupUser,
final List<StorageDescriptor> storageDescriptors) {
if (backupUser.backupTier().compareTo(BackupTier.MESSAGES) < 0) {
Metrics.counter(ZK_AUTHZ_FAILURE_COUNTER_NAME).increment();
throw Status.PERMISSION_DENIED
.withDescription("credential does not support list operation")
.asRuntimeException();
}
checkBackupTier(backupUser, BackupTier.MESSAGES);
if (storageDescriptors.stream().anyMatch(sd -> sd.cdn() != remoteStorageManager.cdnNumber())) {
throw Status.INVALID_ARGUMENT
@@ -430,6 +426,7 @@ public class BackupManager {
}
private static final ECPublicKey INVALID_PUBLIC_KEY = Curve.generateKeyPair().getPublicKey();
/**
* Authenticate the ZK anonymous backup credential's presentation
* <p>
@@ -532,6 +529,7 @@ public class BackupManager {
}
interface PresentationSignatureVerifier {
BackupTier verifySignature(byte[] signature, ECPublicKey publicKey);
}
@@ -576,6 +574,22 @@ public class BackupManager {
};
}
/**
* Check that the authenticated backup user is authorized to use the provided backupTier
*
* @param backupUser The backup user to check
* @param backupTier The authorization level to verify the backupUser has access to
* @throws {@link Status#PERMISSION_DENIED} error if the backup user is not authorized to access {@code backupTier}
*/
private static void checkBackupTier(final AuthenticatedBackupUser backupUser, final BackupTier backupTier) {
if (backupUser.backupTier().compareTo(backupTier) < 0) {
Metrics.counter(ZK_AUTHZ_FAILURE_COUNTER_NAME).increment();
throw Status.PERMISSION_DENIED
.withDescription("credential does not support the requested operation")
.asRuntimeException();
}
}
@VisibleForTesting
static String encodeMediaIdForCdn(final byte[] bytes) {
return Base64.getUrlEncoder().encodeToString(bytes);
@@ -596,4 +610,8 @@ public class BackupManager {
private static String cdnMediaPath(final AuthenticatedBackupUser backupUser, final byte[] mediaId) {
return "%s%s".formatted(cdnMediaDirectory(backupUser), encodeMediaIdForCdn(mediaId));
}
static String rateLimitKey(final AuthenticatedBackupUser backupUser) {
return Base64.getEncoder().encodeToString(BackupsDb.hashedBackupId(backupUser.backupId()));
}
}

View File

@@ -7,7 +7,7 @@ package org.whispersystems.textsecuregcm.backup;
import java.util.Map;
public record MessageBackupUploadDescriptor(
public record BackupUploadDescriptor(
int cdn,
String key,
Map<String, String> headers,

View File

@@ -49,7 +49,7 @@ public class Cdn3BackupCredentialGenerator {
.build();
}
public MessageBackupUploadDescriptor generateUpload(final String key) {
public BackupUploadDescriptor generateUpload(final String key) {
if (key.isBlank()) {
throw new IllegalArgumentException("Upload descriptors must have non-empty keys");
}
@@ -60,7 +60,7 @@ public class Cdn3BackupCredentialGenerator {
HttpHeaders.AUTHORIZATION, HeaderUtils.basicAuthHeader(credentials),
"Upload-Metadata", String.format("filename %s", b64Key));
return new MessageBackupUploadDescriptor(
return new BackupUploadDescriptor(
BACKUP_CDN,
key,
headers,

View File

@@ -113,7 +113,7 @@ public class Cdn3RemoteStorageManager implements RemoteStorageManager {
final URI sourceUri,
final int expectedSourceLength,
final MediaEncryptionParameters encryptionParameters,
final MessageBackupUploadDescriptor uploadDescriptor) {
final BackupUploadDescriptor uploadDescriptor) {
if (uploadDescriptor.cdn() != cdnNumber()) {
throw new IllegalArgumentException("Cdn3RemoteStorageManager can only copy to cdn3");
@@ -152,7 +152,7 @@ public class Cdn3RemoteStorageManager implements RemoteStorageManager {
private HttpRequest createCopyRequest(
final int expectedSourceLength,
final MessageBackupUploadDescriptor uploadDescriptor,
final BackupUploadDescriptor uploadDescriptor,
BackupMediaEncrypter encrypter,
HttpResponse<Flow.Publisher<List<ByteBuffer>>> response) throws IOException {
if (response.statusCode() == Response.Status.NOT_FOUND.getStatusCode()) {

View File

@@ -36,7 +36,7 @@ public interface RemoteStorageManager {
URI sourceUri,
int expectedSourceLength,
MediaEncryptionParameters encryptionParameters,
MessageBackupUploadDescriptor uploadDescriptor);
BackupUploadDescriptor uploadDescriptor);
/**
* Result of a {@link #list} operation

View File

@@ -384,7 +384,7 @@ public class ArchiveController {
}
public record MessageBackupResponse(
public record UploadDescriptorResponse(
@Schema(description = "Indicates the CDN type. 3 indicates resumable uploads using TUS")
int cdn,
@Schema(description = "The location within the specified cdn where the finished upload can be found.")
@@ -400,10 +400,10 @@ public class ArchiveController {
@Operation(
summary = "Fetch message backup upload form",
description = "Retrieve an upload form that can be used to perform a resumable upload of a message backup.")
@ApiResponse(responseCode = "200", content = @Content(schema = @Schema(implementation = MessageBackupResponse.class)))
@ApiResponse(responseCode = "200", content = @Content(schema = @Schema(implementation = UploadDescriptorResponse.class)))
@ApiResponse(responseCode = "429", description = "Rate limited.")
@ApiResponseZkAuth
public CompletionStage<MessageBackupResponse> backup(
public CompletionStage<UploadDescriptorResponse> backup(
@ReadOnly @Auth final Optional<AuthenticatedAccount> account,
@Parameter(description = BackupAuthCredentialPresentationHeader.DESCRIPTION, schema = @Schema(implementation = String.class))
@@ -418,7 +418,49 @@ public class ArchiveController {
}
return backupManager.authenticateBackupUser(presentation.presentation, signature.signature)
.thenCompose(backupManager::createMessageBackupUploadDescriptor)
.thenApply(result -> new MessageBackupResponse(
.thenApply(result -> new UploadDescriptorResponse(
result.cdn(),
result.key(),
result.headers(),
result.signedUploadLocation()));
}
@GET
@Path("/media/upload/form")
@Produces(MediaType.APPLICATION_JSON)
@Operation(
summary = "Fetch media attachment upload form",
description = """
Retrieve an upload form that can be used to perform a resumable upload of an attachment. After uploading, the
attachment can be copied into the backup at PUT /archives/media/.
Like the account authenticated version at /attachments, the uploaded object is only temporary.
""")
@ApiResponse(responseCode = "200", content = @Content(schema = @Schema(implementation = UploadDescriptorResponse.class)))
@ApiResponse(responseCode = "429", description = "Rate limited.")
@ApiResponseZkAuth
public CompletionStage<UploadDescriptorResponse> uploadTemporaryAttachment(
@ReadOnly @Auth final Optional<AuthenticatedAccount> account,
@Parameter(description = BackupAuthCredentialPresentationHeader.DESCRIPTION, schema = @Schema(implementation = String.class))
@NotNull
@HeaderParam(X_SIGNAL_ZK_AUTH) final ArchiveController.BackupAuthCredentialPresentationHeader presentation,
@Parameter(description = BackupAuthCredentialPresentationSignature.DESCRIPTION, schema = @Schema(implementation = String.class))
@NotNull
@HeaderParam(X_SIGNAL_ZK_AUTH_SIGNATURE) final BackupAuthCredentialPresentationSignature signature) {
if (account.isPresent()) {
throw new BadRequestException("must not use authenticated connection for anonymous operations");
}
return backupManager.authenticateBackupUser(presentation.presentation, signature.signature)
.thenApply(backupUser -> {
try {
return backupManager.createTemporaryAttachmentUploadDescriptor(backupUser);
} catch (RateLimitExceededException e) {
throw ExceptionUtils.wrap(e);
}
})
.thenApply(result -> new UploadDescriptorResponse(
result.cdn(),
result.key(),
result.headers(),

View File

@@ -26,6 +26,7 @@ public class RateLimiters extends BaseRateLimiters<RateLimiters.For> {
VERIFY("verify", false, new RateLimiterConfig(6, Duration.ofSeconds(30))),
PIN("pin", false, new RateLimiterConfig(10, Duration.ofDays(1))),
ATTACHMENT("attachmentCreate", false, new RateLimiterConfig(50, Duration.ofMillis(1200))),
BACKUP_ATTACHMENT("backupAttachmentCreate", true, new RateLimiterConfig(10_000, Duration.ofSeconds(1))),
PRE_KEYS("prekeys", false, new RateLimiterConfig(6, Duration.ofMinutes(10))),
MESSAGES("messages", false, new RateLimiterConfig(60, Duration.ofSeconds(1))),
STORIES("stories", false, new RateLimiterConfig(5_000, Duration.ofSeconds(8))),

View File

@@ -19,6 +19,7 @@ import org.signal.libsignal.zkgroup.GenericServerSecretParams;
import org.signal.libsignal.zkgroup.InvalidInputException;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.WhisperServerService;
import org.whispersystems.textsecuregcm.attachments.TusAttachmentGenerator;
import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentialsGenerator;
import org.whispersystems.textsecuregcm.backup.BackupManager;
import org.whispersystems.textsecuregcm.backup.BackupsDb;
@@ -27,6 +28,7 @@ import org.whispersystems.textsecuregcm.backup.Cdn3RemoteStorageManager;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.controllers.SecureStorageController;
import org.whispersystems.textsecuregcm.controllers.SecureValueRecovery2Controller;
import org.whispersystems.textsecuregcm.limits.RateLimiters;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
import org.whispersystems.textsecuregcm.redis.ClusterFaultTolerantRedisCluster;
@@ -198,6 +200,8 @@ record CommandDependencies(
secureStorageClient, secureValueRecovery2Client, clientPresenceManager,
registrationRecoveryPasswordsManager, accountLockExecutor, clientPresenceExecutor,
clock);
RateLimiters rateLimiters = RateLimiters.createAndValidate(configuration.getLimitsConfiguration(),
dynamicConfigurationManager, rateLimitersCluster);
final BackupsDb backupsDb =
new BackupsDb(dynamoDbAsyncClient, configuration.getDynamoDbTables().getBackups().getTableName(), clock);
final GenericServerSecretParams backupsGenericZkSecretParams;
@@ -210,6 +214,8 @@ record CommandDependencies(
final BackupManager backupManager = new BackupManager(
backupsDb,
backupsGenericZkSecretParams,
rateLimiters,
new TusAttachmentGenerator(configuration.getTus()),
new Cdn3BackupCredentialGenerator(configuration.getTus()),
new Cdn3RemoteStorageManager(
remoteStorageExecutor,