mirror of
https://github.com/signalapp/Signal-Server
synced 2026-04-21 04:58:06 +01:00
Add media deletion endpoint
This commit is contained in:
committed by
ravi-signal
parent
e934ead85c
commit
cc6cf8194f
@@ -11,6 +11,7 @@ import io.micrometer.core.instrument.Metrics;
|
||||
import java.net.URI;
|
||||
import java.time.Clock;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Base64;
|
||||
import java.util.HexFormat;
|
||||
import java.util.List;
|
||||
@@ -30,6 +31,8 @@ import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.auth.AuthenticatedBackupUser;
|
||||
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
|
||||
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public class BackupManager {
|
||||
|
||||
@@ -125,12 +128,10 @@ public class BackupManager {
|
||||
*/
|
||||
public CompletableFuture<MessageBackupUploadDescriptor> createMessageBackupUploadDescriptor(
|
||||
final AuthenticatedBackupUser backupUser) {
|
||||
final String encodedBackupId = encodeBackupIdForCdn(backupUser);
|
||||
|
||||
// this could race with concurrent updates, but the only effect would be last-writer-wins on the timestamp
|
||||
return backupsDb
|
||||
.addMessageBackup(backupUser)
|
||||
.thenApply(result -> cdn3BackupCredentialGenerator.generateUpload(encodedBackupId, MESSAGE_BACKUP_NAME));
|
||||
.thenApply(result -> cdn3BackupCredentialGenerator.generateUpload(cdnMessageBackupName(backupUser)));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -194,8 +195,7 @@ public class BackupManager {
|
||||
|
||||
// The user is out of quota, and we have not recently recalculated the user's usage. Double check by doing a
|
||||
// hard recalculation before actually forbidding the user from storing additional media.
|
||||
final String mediaPrefix = "%s/%s/".formatted(encodeBackupIdForCdn(backupUser), MEDIA_DIRECTORY_NAME);
|
||||
return this.remoteStorageManager.calculateBytesUsed(mediaPrefix)
|
||||
return this.remoteStorageManager.calculateBytesUsed(cdnMediaDirectory(backupUser))
|
||||
.thenCompose(usage -> backupsDb
|
||||
.setMediaUsage(backupUser, usage)
|
||||
.thenApply(ignored -> usage))
|
||||
@@ -249,15 +249,14 @@ public class BackupManager {
|
||||
}
|
||||
|
||||
final MessageBackupUploadDescriptor dst = cdn3BackupCredentialGenerator.generateUpload(
|
||||
encodeBackupIdForCdn(backupUser),
|
||||
"%s/%s".formatted(MEDIA_DIRECTORY_NAME, encodeForCdn(destinationMediaId)));
|
||||
cdnMediaPath(backupUser, destinationMediaId));
|
||||
|
||||
final int destinationLength = encryptionParameters.outputSize(sourceLength);
|
||||
|
||||
final URI sourceUri = attachmentReadUri(sourceCdn, sourceKey);
|
||||
return this.backupsDb
|
||||
// Write the ddb updates before actually updating backing storage
|
||||
.trackMedia(backupUser, destinationLength)
|
||||
.trackMedia(backupUser, 1, destinationLength)
|
||||
|
||||
// Actually copy the objects. If the copy fails, our estimated quota usage may not be exact
|
||||
.thenComposeAsync(ignored -> remoteStorageManager.copy(sourceUri, sourceLength, encryptionParameters, dst))
|
||||
@@ -267,7 +266,7 @@ public class BackupManager {
|
||||
throw ExceptionUtils.wrap(unwrapped);
|
||||
}
|
||||
// In cases where we know the copy fails without writing anything, we can try to restore the user's quota
|
||||
return this.backupsDb.trackMedia(backupUser, -destinationLength).whenComplete((ignored, ignoredEx) -> {
|
||||
return this.backupsDb.trackMedia(backupUser, -1, -destinationLength).whenComplete((ignored, ignoredEx) -> {
|
||||
throw ExceptionUtils.wrap(unwrapped);
|
||||
});
|
||||
})
|
||||
@@ -335,8 +334,7 @@ public class BackupManager {
|
||||
.withDescription("credential does not support list operation")
|
||||
.asRuntimeException();
|
||||
}
|
||||
final String mediaPrefix = "%s/%s/".formatted(encodeBackupIdForCdn(backupUser), MEDIA_DIRECTORY_NAME);
|
||||
return remoteStorageManager.list(mediaPrefix, cursor, limit)
|
||||
return remoteStorageManager.list(cdnMediaDirectory(backupUser), cursor, limit)
|
||||
.thenApply(result ->
|
||||
new ListMediaResult(
|
||||
result
|
||||
@@ -352,6 +350,74 @@ public class BackupManager {
|
||||
));
|
||||
}
|
||||
|
||||
|
||||
private sealed interface Either permits DeleteSuccess, DeleteFailure {}
|
||||
|
||||
private record DeleteSuccess(long usage) implements Either {}
|
||||
|
||||
private record DeleteFailure(Throwable e) implements Either {}
|
||||
|
||||
public CompletableFuture<Void> delete(final AuthenticatedBackupUser backupUser,
|
||||
final List<StorageDescriptor> storageDescriptors) {
|
||||
if (backupUser.backupTier().compareTo(BackupTier.MESSAGES) < 0) {
|
||||
Metrics.counter(ZK_AUTHZ_FAILURE_COUNTER_NAME).increment();
|
||||
throw Status.PERMISSION_DENIED
|
||||
.withDescription("credential does not support list operation")
|
||||
.asRuntimeException();
|
||||
}
|
||||
|
||||
if (storageDescriptors.stream().anyMatch(sd -> sd.cdn() != remoteStorageManager.cdnNumber())) {
|
||||
throw Status.INVALID_ARGUMENT
|
||||
.withDescription("unsupported media cdn provided")
|
||||
.asRuntimeException();
|
||||
}
|
||||
|
||||
return Flux
|
||||
.fromIterable(storageDescriptors)
|
||||
|
||||
// Issue deletes for all storage descriptors (proceeds with default flux concurrency)
|
||||
.flatMap(descriptor -> Mono.fromCompletionStage(
|
||||
remoteStorageManager
|
||||
.delete(cdnMediaPath(backupUser, descriptor.key))
|
||||
// Squash errors/success into a single type
|
||||
.handle((bytesDeleted, throwable) -> throwable != null
|
||||
? new DeleteFailure(throwable)
|
||||
: new DeleteSuccess(bytesDeleted))
|
||||
))
|
||||
|
||||
// Update backupsDb with the change in usage
|
||||
.collectList()
|
||||
.<Void>flatMap(eithers -> {
|
||||
// count up usage changes
|
||||
long totalBytesDeleted = 0;
|
||||
long totalCountDeleted = 0;
|
||||
final List<Throwable> toThrow = new ArrayList<>();
|
||||
for (Either either : eithers) {
|
||||
switch (either) {
|
||||
case DeleteFailure f:
|
||||
toThrow.add(f.e());
|
||||
break;
|
||||
case DeleteSuccess s when s.usage() > 0:
|
||||
totalBytesDeleted += s.usage();
|
||||
totalCountDeleted++;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
final Mono<Void> result = toThrow.isEmpty()
|
||||
? Mono.empty()
|
||||
: Mono.error(toThrow.stream().reduce((t1, t2) -> {
|
||||
t1.addSuppressed(t2);
|
||||
return t1;
|
||||
}).get());
|
||||
return Mono
|
||||
.fromCompletionStage(this.backupsDb.trackMedia(backupUser, -totalCountDeleted, -totalBytesDeleted))
|
||||
.then(result);
|
||||
})
|
||||
.toFuture();
|
||||
}
|
||||
|
||||
/**
|
||||
* Authenticate the ZK anonymous backup credential's presentation
|
||||
* <p>
|
||||
@@ -452,7 +518,8 @@ public class BackupManager {
|
||||
return encodeForCdn(BackupsDb.hashedBackupId(backupUser.backupId()));
|
||||
}
|
||||
|
||||
private static String encodeForCdn(final byte[] bytes) {
|
||||
@VisibleForTesting
|
||||
static String encodeForCdn(final byte[] bytes) {
|
||||
return Base64.getUrlEncoder().encodeToString(bytes);
|
||||
}
|
||||
|
||||
@@ -460,4 +527,16 @@ public class BackupManager {
|
||||
return Base64.getUrlDecoder().decode(base64);
|
||||
}
|
||||
|
||||
private static String cdnMessageBackupName(final AuthenticatedBackupUser backupUser) {
|
||||
return "%s/%s".formatted(encodeBackupIdForCdn(backupUser), MESSAGE_BACKUP_NAME);
|
||||
}
|
||||
|
||||
private static String cdnMediaDirectory(final AuthenticatedBackupUser backupUser) {
|
||||
return "%s/%s/".formatted(encodeBackupIdForCdn(backupUser), MEDIA_DIRECTORY_NAME);
|
||||
}
|
||||
|
||||
private static String cdnMediaPath(final AuthenticatedBackupUser backupUser, final byte[] mediaId) {
|
||||
return "%s%s".formatted(cdnMediaDirectory(backupUser), encodeForCdn(mediaId));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -126,18 +126,19 @@ public class BackupsDb {
|
||||
* Update the quota in the backup table
|
||||
*
|
||||
* @param backupUser The backup user
|
||||
* @param mediaLength The length of the media after encryption. A negative length implies the media is being removed
|
||||
* @param mediaBytesDelta The length of the media after encryption. A negative length implies media being removed
|
||||
* @param mediaCountDelta The number of media objects being added, or if negative, removed
|
||||
* @return A stage that completes successfully once the table are updated.
|
||||
*/
|
||||
CompletableFuture<Void> trackMedia(final AuthenticatedBackupUser backupUser, final int mediaLength) {
|
||||
CompletableFuture<Void> trackMedia(final AuthenticatedBackupUser backupUser, final long mediaCountDelta, final long mediaBytesDelta) {
|
||||
final Instant now = clock.instant();
|
||||
return dynamoClient
|
||||
.updateItem(
|
||||
// Update the media quota and TTL
|
||||
UpdateBuilder.forUser(backupTableName, backupUser)
|
||||
.setRefreshTimes(now)
|
||||
.incrementMediaBytes(mediaLength)
|
||||
.incrementMediaCount(Integer.signum(mediaLength))
|
||||
.incrementMediaBytes(mediaBytesDelta)
|
||||
.incrementMediaCount(mediaCountDelta)
|
||||
.updateItemBuilder()
|
||||
.build())
|
||||
.thenRun(Util.NOOP);
|
||||
|
||||
@@ -49,11 +49,10 @@ public class Cdn3BackupCredentialGenerator {
|
||||
.build();
|
||||
}
|
||||
|
||||
public MessageBackupUploadDescriptor generateUpload(final String hashedBackupId, final String objectName) {
|
||||
if (hashedBackupId.isBlank() || objectName.isBlank()) {
|
||||
public MessageBackupUploadDescriptor generateUpload(final String key) {
|
||||
if (key.isBlank()) {
|
||||
throw new IllegalArgumentException("Upload descriptors must have non-empty keys");
|
||||
}
|
||||
final String key = "%s/%s".formatted(hashedBackupId, objectName);
|
||||
final String entity = WRITE_ENTITY_PREFIX + key;
|
||||
final ExternalServiceCredentials credentials = credentialsGenerator.generateFor(entity);
|
||||
final String b64Key = Base64.getEncoder().encodeToString(key.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
package org.whispersystems.textsecuregcm.backup;
|
||||
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import io.micrometer.core.instrument.Timer;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
@@ -21,8 +23,6 @@ import java.util.stream.Stream;
|
||||
import javax.annotation.Nullable;
|
||||
import javax.validation.constraints.NotNull;
|
||||
import javax.ws.rs.core.Response;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import io.micrometer.core.instrument.Timer;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@@ -168,10 +168,7 @@ public class Cdn3RemoteStorageManager implements RemoteStorageManager {
|
||||
cursor.ifPresent(s -> queryParams.put("cursor", cursor.get()));
|
||||
|
||||
final HttpRequest request = HttpRequest.newBuilder().GET()
|
||||
.uri(URI.create("%s/%s/%s".formatted(
|
||||
storageManagerBaseUrl,
|
||||
Cdn3BackupCredentialGenerator.CDN_PATH,
|
||||
HttpUtils.queryParamString(queryParams.entrySet()))))
|
||||
.uri(URI.create("%s%s".formatted(listUrl(), HttpUtils.queryParamString(queryParams.entrySet()))))
|
||||
.header(CLIENT_ID_HEADER, clientId)
|
||||
.header(CLIENT_SECRET_HEADER, clientSecret)
|
||||
.build();
|
||||
@@ -226,12 +223,13 @@ public class Cdn3RemoteStorageManager implements RemoteStorageManager {
|
||||
*/
|
||||
record UsageResponse(@NotNull long numObjects, @NotNull long bytesUsed) {}
|
||||
|
||||
|
||||
@Override
|
||||
public CompletionStage<UsageInfo> calculateBytesUsed(final String prefix) {
|
||||
final Timer.Sample sample = Timer.start();
|
||||
final HttpRequest request = HttpRequest.newBuilder().GET()
|
||||
.uri(URI.create("%s/usage%s".formatted(
|
||||
storageManagerBaseUrl,
|
||||
.uri(URI.create("%s%s".formatted(
|
||||
usageUrl(),
|
||||
HttpUtils.queryParamString(Map.of("prefix", prefix).entrySet()))))
|
||||
.header(CLIENT_ID_HEADER, clientId)
|
||||
.header(CLIENT_SECRET_HEADER, clientSecret)
|
||||
@@ -260,5 +258,49 @@ public class Cdn3RemoteStorageManager implements RemoteStorageManager {
|
||||
return new UsageInfo(response.bytesUsed(), response.numObjects);
|
||||
}
|
||||
|
||||
/**
|
||||
* Serialized delete response from storage manager
|
||||
*/
|
||||
record DeleteResponse(@NotNull long bytesDeleted) {}
|
||||
|
||||
public CompletionStage<Long> delete(final String key) {
|
||||
final HttpRequest request = HttpRequest.newBuilder().DELETE()
|
||||
.uri(URI.create(deleteUrl(key)))
|
||||
.header(CLIENT_ID_HEADER, clientId)
|
||||
.header(CLIENT_SECRET_HEADER, clientSecret)
|
||||
.build();
|
||||
return this.storageManagerHttpClient.sendAsync(request, HttpResponse.BodyHandlers.ofInputStream())
|
||||
.thenApply(response -> {
|
||||
Metrics.counter(STORAGE_MANAGER_STATUS_COUNTER_NAME,
|
||||
OPERATION_TAG_NAME, "delete",
|
||||
STATUS_TAG_NAME, Integer.toString(response.statusCode()))
|
||||
.increment();
|
||||
try {
|
||||
return parseDeleteResponse(response);
|
||||
} catch (IOException e) {
|
||||
throw ExceptionUtils.wrap(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private long parseDeleteResponse(final HttpResponse<InputStream> httpDeleteResponse) throws IOException {
|
||||
if (!HttpUtils.isSuccessfulResponse(httpDeleteResponse.statusCode())) {
|
||||
throw new IOException("Failed to retrieve usage: " + httpDeleteResponse.statusCode());
|
||||
}
|
||||
return SystemMapper.jsonMapper().readValue(httpDeleteResponse.body(), DeleteResponse.class).bytesDeleted();
|
||||
}
|
||||
|
||||
private String deleteUrl(final String key) {
|
||||
return "%s/%s/%s".formatted(storageManagerBaseUrl, Cdn3BackupCredentialGenerator.CDN_PATH, key);
|
||||
}
|
||||
|
||||
private String usageUrl() {
|
||||
return "%s/usage".formatted(storageManagerBaseUrl);
|
||||
}
|
||||
|
||||
private String listUrl() {
|
||||
return "%s/%s/".formatted(storageManagerBaseUrl, Cdn3BackupCredentialGenerator.CDN_PATH);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -74,4 +74,12 @@ public interface RemoteStorageManager {
|
||||
* @return The number of bytes used
|
||||
*/
|
||||
CompletionStage<UsageInfo> calculateBytesUsed(final String prefix);
|
||||
|
||||
/**
|
||||
* Delete the specified object.
|
||||
*
|
||||
* @param key the key of the stored object to delete.
|
||||
* @return the number of bytes freed by the deletion operation
|
||||
*/
|
||||
CompletionStage<Long> delete(final String key);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user