Add DELETE v1/archives

This commit is contained in:
ravi-signal
2024-04-23 16:50:11 -05:00
committed by GitHub
parent b3bd4ccc17
commit 9ef1fee172
6 changed files with 191 additions and 18 deletions

View File

@@ -23,6 +23,7 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import io.micrometer.core.instrument.Timer;
import org.apache.commons.lang3.StringUtils;
import org.signal.libsignal.protocol.ecc.Curve;
import org.signal.libsignal.protocol.ecc.ECPublicKey;
@@ -38,6 +39,7 @@ import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException;
import org.whispersystems.textsecuregcm.limits.RateLimiter;
import org.whispersystems.textsecuregcm.limits.RateLimiters;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.util.AsyncTimerUtil;
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -50,8 +52,14 @@ public class BackupManager {
static final String MESSAGE_BACKUP_NAME = "messageBackup";
static final long MAX_TOTAL_BACKUP_MEDIA_BYTES = 1024L * 1024L * 1024L * 50L;
static final long MAX_MEDIA_OBJECT_SIZE = 1024L * 1024L * 101L;
// If the last media usage recalculation is over MAX_QUOTA_STALENESS, force a recalculation before quota enforcement.
static final Duration MAX_QUOTA_STALENESS = Duration.ofDays(1);
// How many cdn object deletion requests can be outstanding at a time per backup deletion operation
private static final int DELETION_CONCURRENCY = 10;
private static final String ZK_AUTHN_COUNTER_NAME = MetricsUtil.name(BackupManager.class, "authentication");
private static final String ZK_AUTHZ_FAILURE_COUNTER_NAME = MetricsUtil.name(BackupManager.class,
"authorizationFailure");
@@ -59,6 +67,8 @@ public class BackupManager {
"usageRecalculation");
private static final String DELETE_COUNT_DISTRIBUTION_NAME = MetricsUtil.name(BackupManager.class,
"deleteCount");
private static final Timer SYNCHRONOUS_DELETE_TIMER =
Metrics.timer(MetricsUtil.name(BackupManager.class, "synchronousDelete"));
private static final String SUCCESS_TAG_NAME = "success";
private static final String FAILURE_REASON_TAG_NAME = "reason";
@@ -317,7 +327,7 @@ public class BackupManager {
* Generate credentials that can be used to read from the backup CDN
*
* @param backupUser an already ZK authenticated backup user
* @param cdnNumber the cdn number to get backup credentials for
* @param cdnNumber the cdn number to get backup credentials for
* @return A map of headers to include with CDN requests
*/
public Map<String, String> generateReadAuth(final AuthenticatedBackupUser backupUser, final int cdnNumber) {
@@ -366,6 +376,16 @@ public class BackupManager {
));
}
public CompletableFuture<Void> deleteEntireBackup(final AuthenticatedBackupUser backupUser) {
checkBackupTier(backupUser, BackupTier.MESSAGES);
return backupsDb
// Try to swap out the backupDir for the user
.scheduleBackupDeletion(backupUser)
// If there was already a pending swap, try to delete the cdn objects directly
.exceptionallyCompose(ExceptionUtils.exceptionallyHandler(BackupsDb.PendingDeletionException.class,e ->
AsyncTimerUtil.record(SYNCHRONOUS_DELETE_TIMER, () ->
deletePrefix(backupUser.backupDir(), DELETION_CONCURRENCY))));
}
private sealed interface Either permits DeleteSuccess, DeleteFailure {}
@@ -494,7 +514,10 @@ public class BackupManager {
*/
public CompletableFuture<Void> expireBackup(final ExpiredBackup expiredBackup) {
return backupsDb.startExpiration(expiredBackup)
.thenCompose(ignored -> deletePrefix(expiredBackup.prefixToDelete()))
// the deletion operation is effectively single threaded -- it's expected that the caller can increase
// concurrency by deleting more backups at once, rather than increasing concurrency deleting an individual
// backup
.thenCompose(ignored -> deletePrefix(expiredBackup.prefixToDelete(), 1))
.thenCompose(ignored -> backupsDb.finishExpiration(expiredBackup));
}
@@ -504,7 +527,7 @@ public class BackupManager {
* @param prefixToDelete The prefix to expire.
* @return A stage that completes when all objects with the given prefix have been deleted
*/
private CompletableFuture<Void> deletePrefix(final String prefixToDelete) {
private CompletableFuture<Void> deletePrefix(final String prefixToDelete, int concurrentDeletes) {
if (prefixToDelete.length() != BackupsDb.BACKUP_DIRECTORY_PATH_LENGTH
&& prefixToDelete.length() != BackupsDb.MEDIA_DIRECTORY_PATH_LENGTH) {
throw new IllegalArgumentException("Unexpected prefix deletion for " + prefixToDelete);
@@ -519,10 +542,9 @@ public class BackupManager {
return Mono.fromCompletionStage(() -> this.remoteStorageManager.list(prefix, listResult.cursor(), 1000));
})
.flatMap(listResult -> Flux.fromIterable(listResult.objects()))
// Delete the objects. concatMap effectively makes the deletion operation single threaded -- it's expected
// the caller can increase concurrency by deleting more backups at once, rather than increasing concurrency
// deleting an individual backup
.concatMap(result -> Mono.fromCompletionStage(() -> remoteStorageManager.delete(prefix + result.key())))
.flatMap(
result -> Mono.fromCompletionStage(() -> remoteStorageManager.delete(prefix + result.key())),
concurrentDeletes)
.count()
.doOnSuccess(itemsRemoved -> DistributionSummary.builder(DELETE_COUNT_DISTRIBUTION_NAME)
.publishPercentileHistogram(true)

View File

@@ -5,6 +5,7 @@
package org.whispersystems.textsecuregcm.backup;
import io.grpc.Status;
import java.io.IOException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
@@ -259,6 +260,55 @@ public class BackupsDb {
.thenRun(Util.NOOP);
}
/**
* Indicates that we couldn't schedule a deletion because one was already scheduled. The caller may want to delete the
* objects directly.
*/
class PendingDeletionException extends IOException {}
/**
* Attempt to mark a backup as expired and swap in a new empty backupDir for the user.
* <p>
* After successful completion, the backupDir for the backup-id will be swapped to a new empty directory on the cdn,
* and the row will be immediately marked eligible for expiration via {@link #getExpiredBackups}.
* <p>
* If there is already a pending deletion, this will not swap the backupDir. The expiration timestamps will be
* updated, but the existing backupDir will remain. The caller should handle this case and start the deletion
* immediately by catching {@link PendingDeletionException}.
*
* @param backupUser The backupUser whose data should be eventually deleted
* @return A future that completes successfully if the user's data is now inaccessible, or with a
* {@link PendingDeletionException} if the backupDir could not be changed.
*/
CompletableFuture<Void> scheduleBackupDeletion(final AuthenticatedBackupUser backupUser) {
final byte[] hashedBackupId = hashedBackupId(backupUser);
// Clear usage metadata, swap names of things we intend to delete, and record our intent to delete in attr_expired_prefix
return dynamoClient.updateItem(new UpdateBuilder(backupTableName, BackupTier.MEDIA, hashedBackupId)
.clearMediaUsage(clock)
.expireDirectoryNames(secureRandom, ExpiredBackup.ExpirationType.ALL)
.setRefreshTimes(Instant.ofEpochSecond(0))
.addSetExpression("#expiredPrefix = :expiredPrefix",
Map.entry("#expiredPrefix", ATTR_EXPIRED_PREFIX),
Map.entry(":expiredPrefix", AttributeValues.s(backupUser.backupDir())))
.withConditionExpression("attribute_not_exists(#expiredPrefix) OR #expiredPrefix = :expiredPrefix")
.updateItemBuilder()
.build())
.exceptionallyCompose(ExceptionUtils.exceptionallyHandler(ConditionalCheckFailedException.class, e ->
// We already have a pending deletion for this backup-id. This is most likely to occur when the caller
// is toggling backups on and off. In this case, it should be pretty cheap to directly delete the backup.
// Instead of changing the backupDir, just make sure the row has expired/ timestamps and tell the caller we
// couldn't schedule the deletion.
dynamoClient.updateItem(new UpdateBuilder(backupTableName, BackupTier.MEDIA, hashedBackupId)
.setRefreshTimes(Instant.ofEpochSecond(0))
.updateItemBuilder()
.build())
.thenApply(ignore -> {
throw ExceptionUtils.wrap(new PendingDeletionException());
})))
.thenRun(Util.NOOP);
}
record BackupDescription(int cdn, Optional<Long> mediaUsedSpace) {}
/**
@@ -349,15 +399,7 @@ public class BackupsDb {
// Clear usage metadata, swap names of things we intend to delete, and record our intent to delete in attr_expired_prefix
return dynamoClient.updateItem(new UpdateBuilder(backupTableName, BackupTier.MEDIA, expiredBackup.hashedBackupId())
.addSetExpression("#mediaBytesUsed = :mediaBytesUsed",
Map.entry("#mediaBytesUsed", ATTR_MEDIA_BYTES_USED),
Map.entry(":mediaBytesUsed", AttributeValues.n(0L)))
.addSetExpression("#mediaCount = :mediaCount",
Map.entry("#mediaCount", ATTR_MEDIA_COUNT),
Map.entry(":mediaCount", AttributeValues.n(0L)))
.addSetExpression("#mediaRecalc = :mediaRecalc",
Map.entry("#mediaRecalc", ATTR_MEDIA_USAGE_LAST_RECALCULATION),
Map.entry(":mediaRecalc", AttributeValues.n(clock.instant().getEpochSecond())))
.clearMediaUsage(clock)
.expireDirectoryNames(secureRandom, expiredBackup.expirationType())
.addRemoveExpression(Map.entry("#mediaRefresh", ATTR_LAST_MEDIA_REFRESH))
.addSetExpression("#expiredPrefix = :expiredPrefix",
@@ -587,6 +629,19 @@ public class BackupsDb {
return this;
}
UpdateBuilder clearMediaUsage(final Clock clock) {
addSetExpression("#mediaBytesUsed = :mediaBytesUsed",
Map.entry("#mediaBytesUsed", ATTR_MEDIA_BYTES_USED),
Map.entry(":mediaBytesUsed", AttributeValues.n(0L)));
addSetExpression("#mediaCount = :mediaCount",
Map.entry("#mediaCount", ATTR_MEDIA_COUNT),
Map.entry(":mediaCount", AttributeValues.n(0L)));
addSetExpression("#mediaRecalc = :mediaRecalc",
Map.entry("#mediaRecalc", ATTR_MEDIA_USAGE_LAST_RECALCULATION),
Map.entry(":mediaRecalc", AttributeValues.n(clock.instant().getEpochSecond())));
return this;
}
UpdateBuilder setDirectoryNamesIfMissing(final SecureRandom secureRandom) {
final String backupDir = generateDirName(secureRandom);
final String mediaDir = generateDirName(secureRandom);