Add a crawler that expires old backups

This commit is contained in:
Ravi Khadiwala
2024-02-23 13:05:53 -06:00
committed by ravi-signal
parent c35a648734
commit de37141812
8 changed files with 577 additions and 7 deletions

View File

@@ -239,6 +239,7 @@ import org.whispersystems.textsecuregcm.workers.DeleteUserCommand;
import org.whispersystems.textsecuregcm.workers.MessagePersisterServiceCommand;
import org.whispersystems.textsecuregcm.workers.ProcessPushNotificationFeedbackCommand;
import org.whispersystems.textsecuregcm.workers.RemoveExpiredAccountsCommand;
import org.whispersystems.textsecuregcm.workers.RemoveExpiredBackupsCommand;
import org.whispersystems.textsecuregcm.workers.RemoveExpiredLinkedDevicesCommand;
import org.whispersystems.textsecuregcm.workers.RemoveExpiredUsernameHoldsCommand;
import org.whispersystems.textsecuregcm.workers.ScheduledApnPushNotificationSenderServiceCommand;
@@ -301,6 +302,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
bootstrap.addCommand(new MessagePersisterServiceCommand());
bootstrap.addCommand(new RemoveExpiredAccountsCommand(Clock.systemUTC()));
bootstrap.addCommand(new RemoveExpiredUsernameHoldsCommand(Clock.systemUTC()));
bootstrap.addCommand(new RemoveExpiredBackupsCommand(Clock.systemUTC()));
bootstrap.addCommand(new ProcessPushNotificationFeedbackCommand(Clock.systemUTC()));
bootstrap.addCommand(new RemoveExpiredLinkedDevicesCommand());
}

View File

@@ -7,11 +7,13 @@ package org.whispersystems.textsecuregcm.backup;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.Status;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Metrics;
import java.io.IOException;
import java.net.URI;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HexFormat;
@@ -34,6 +36,7 @@ import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
public class BackupManager {
@@ -50,6 +53,9 @@ public class BackupManager {
"authorizationFailure");
private static final String USAGE_RECALCULATION_COUNTER_NAME = MetricsUtil.name(BackupManager.class,
"usageRecalculation");
private static final String DELETE_MEDIA_COUNT_DISTRIBUTION_NAME = MetricsUtil.name(BackupManager.class,
"deleteMediaCount");
private static final String SUCCESS_TAG_NAME = "success";
private static final String FAILURE_REASON_TAG_NAME = "reason";
@@ -475,6 +481,73 @@ public class BackupManager {
});
}
/**
* List all backups whose media or messages refresh timestamp are older than the provided purgeTime
*
* @param segments Number of segments to read in parallel from the underlying backup database
* @param scheduler Scheduler for running downstream operations
* @param purgeTime If a backup's last message refresh time is strictly before purgeTime, it will be marked as
* requiring full deletion. If only the last refresh time is strictly before purgeTime, it will be
* marked as requiring message deletion. Otherwise, it will not be included in the results.
* @return Flux of backups that require some deletion action
*/
public Flux<ExpiredBackup> getExpiredBackups(final int segments, final Scheduler scheduler, final Instant purgeTime) {
return this.backupsDb.getExpiredBackups(segments, scheduler, purgeTime);
}
/**
* Delete some or all of the objects associated with the backup, and update the backup database.
*
* @param backupTierToRemove If {@link BackupTier#MEDIA}, will only delete media associated with the backup, if
* {@link BackupTier#MESSAGES} will also delete the messageBackup and remove any db record
* of the backup
* @param hashedBackupId The hashed backup-id for the backup
* @return A stage that completes when the deletion operation is finished
*/
public CompletableFuture<Void> deleteBackup(final BackupTier backupTierToRemove, final byte[] hashedBackupId) {
return switch (backupTierToRemove) {
case NONE -> CompletableFuture.completedFuture(null);
// Delete any media associated with the backup id, the message backup, and the row in our backups db table
case MESSAGES -> deleteAllMedia(hashedBackupId)
.thenCompose(ignored -> this.remoteStorageManager.delete(
"%s/%s".formatted(encodeForCdn(hashedBackupId), MESSAGE_BACKUP_NAME)))
.thenCompose(ignored -> this.backupsDb.deleteBackup(hashedBackupId));
// Delete any media associated with the backup id, and clear any used media bytes
case MEDIA -> deleteAllMedia(hashedBackupId).thenCompose(ignore -> backupsDb.clearMediaUsage(hashedBackupId));
};
}
/**
* List and delete all media associated with a backup.
*
* @param hashedBackupId The hashed backup-id for the backup
* @return A stage that completes when all media objects have been deleted
*/
private CompletableFuture<Void> deleteAllMedia(final byte[] hashedBackupId) {
final String mediaPrefix = cdnMediaDirectory(hashedBackupId);
return Mono
.fromCompletionStage(this.remoteStorageManager.list(mediaPrefix, Optional.empty(), 1000))
.expand(listResult -> {
if (listResult.cursor().isEmpty()) {
return Mono.empty();
}
return Mono.fromCompletionStage(() -> this.remoteStorageManager.list(mediaPrefix, listResult.cursor(), 1000));
})
.flatMap(listResult -> Flux.fromIterable(listResult.objects()))
// Delete the media objects. concatMap effectively makes the deletion operation single threaded -- it's expected
// the caller can increase/ concurrency by deleting more backups at once, rather than increasing concurrency
// deleting an individual backup
.concatMap(result -> Mono.fromCompletionStage(() ->
remoteStorageManager.delete("%s%s".formatted(mediaPrefix, result.key()))))
.count()
.doOnSuccess(itemsRemoved -> DistributionSummary.builder(DELETE_MEDIA_COUNT_DISTRIBUTION_NAME)
.publishPercentileHistogram(true)
.register(Metrics.globalRegistry)
.record(itemsRemoved))
.then()
.toFuture();
}
/**
* Verify the presentation and return the extracted backup tier
@@ -541,6 +614,10 @@ public class BackupManager {
return "%s/%s/".formatted(encodeBackupIdForCdn(backupUser), MEDIA_DIRECTORY_NAME);
}
private static String cdnMediaDirectory(final byte[] hashedBackupId) {
return "%s/%s/".formatted(encodeForCdn(hashedBackupId), MEDIA_DIRECTORY_NAME);
}
private static String cdnMediaPath(final AuthenticatedBackupUser backupUser, final byte[] mediaId) {
return "%s%s".formatted(cdnMediaDirectory(backupUser), encodeForCdn(mediaId));
}

View File

@@ -1,3 +1,7 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.backup;
import io.grpc.Status;
@@ -12,6 +16,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import org.signal.libsignal.protocol.ecc.ECPublicKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -19,11 +24,15 @@ import org.whispersystems.textsecuregcm.auth.AuthenticatedBackupUser;
import org.whispersystems.textsecuregcm.util.AttributeValues;
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
import org.whispersystems.textsecuregcm.util.Util;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.Update;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
@@ -64,6 +73,7 @@ public class BackupsDb {
// N: Time in seconds since epoch of last backup media usage recalculation. This timestamp is updated whenever we
// recalculate the up-to-date bytes used by querying the cdn(s) directly.
public static final String ATTR_MEDIA_USAGE_LAST_RECALCULATION = "MBTS";
// BOOL: If true,
public BackupsDb(
final DynamoDbAsyncClient dynamoClient,
@@ -125,12 +135,13 @@ public class BackupsDb {
/**
* Update the quota in the backup table
*
* @param backupUser The backup user
* @param backupUser The backup user
* @param mediaBytesDelta The length of the media after encryption. A negative length implies media being removed
* @param mediaCountDelta The number of media objects being added, or if negative, removed
* @return A stage that completes successfully once the table are updated.
*/
CompletableFuture<Void> trackMedia(final AuthenticatedBackupUser backupUser, final long mediaCountDelta, final long mediaBytesDelta) {
CompletableFuture<Void> trackMedia(final AuthenticatedBackupUser backupUser, final long mediaCountDelta,
final long mediaBytesDelta) {
final Instant now = clock.instant();
return dynamoClient
.updateItem(
@@ -175,6 +186,14 @@ public class BackupsDb {
.thenRun(Util.NOOP);
}
CompletableFuture<Void> deleteBackup(final byte[] hashedBackupId) {
return dynamoClient.deleteItem(DeleteItemRequest.builder()
.tableName(backupTableName)
.key(Map.of(KEY_BACKUP_ID_HASH, AttributeValues.b(hashedBackupId)))
.build())
.thenRun(Util.NOOP);
}
record BackupDescription(int cdn, Optional<Long> mediaUsedSpace) {}
@@ -250,6 +269,66 @@ public class BackupsDb {
.thenRun(Util.NOOP);
}
CompletableFuture<Void> clearMediaUsage(final byte[] hashedBackupId) {
return dynamoClient.updateItem(
new UpdateBuilder(backupTableName, BackupTier.MEDIA, hashedBackupId)
.addSetExpression("#mediaBytesUsed = :mediaBytesUsed",
Map.entry("#mediaBytesUsed", ATTR_MEDIA_BYTES_USED),
Map.entry(":mediaBytesUsed", AttributeValues.n(0L)))
.addSetExpression("#mediaCount = :mediaCount",
Map.entry("#mediaCount", ATTR_MEDIA_COUNT),
Map.entry(":mediaCount", AttributeValues.n(0L)))
.addSetExpression("#mediaRecalc = :mediaRecalc",
Map.entry("#mediaRecalc", ATTR_MEDIA_USAGE_LAST_RECALCULATION),
Map.entry(":mediaRecalc", AttributeValues.n(clock.instant().getEpochSecond())))
.addRemoveExpression(Map.entry("#mediaRefresh", ATTR_LAST_MEDIA_REFRESH))
.updateItemBuilder()
.build())
.thenRun(Util.NOOP);
}
Flux<ExpiredBackup> getExpiredBackups(final int segments, final Scheduler scheduler, final Instant purgeTime) {
if (segments < 1) {
throw new IllegalArgumentException("Total number of segments must be positive");
}
return Flux.range(0, segments)
.parallel()
.runOn(scheduler)
.flatMap(segment -> dynamoClient.scanPaginator(ScanRequest.builder()
.tableName(backupTableName)
.consistentRead(true)
.segment(segment)
.totalSegments(segments)
.expressionAttributeNames(Map.of(
"#backupIdHash", KEY_BACKUP_ID_HASH,
"#refresh", ATTR_LAST_REFRESH,
"#mediaRefresh", ATTR_LAST_MEDIA_REFRESH))
.expressionAttributeValues(Map.of(":purgeTime", AttributeValues.n(purgeTime.getEpochSecond())))
.projectionExpression("#backupIdHash, #refresh, #mediaRefresh")
.filterExpression("(#refresh < :purgeTime) OR (#mediaRefresh < :purgeTime)")
.build())
.items())
.sequential()
.filter(Predicate.not(Map::isEmpty))
.mapNotNull(item -> {
final byte[] hashedBackupId = AttributeValues.getByteArray(item, KEY_BACKUP_ID_HASH, null);
if (hashedBackupId == null) {
return null;
}
final long lastRefresh = AttributeValues.getLong(item, ATTR_LAST_REFRESH, Long.MAX_VALUE);
final long lastMediaRefresh = AttributeValues.getLong(item, ATTR_LAST_MEDIA_REFRESH, Long.MAX_VALUE);
if (lastRefresh < purgeTime.getEpochSecond()) {
return new ExpiredBackup(hashedBackupId, BackupTier.MESSAGES);
} else if (lastMediaRefresh < purgeTime.getEpochSecond()) {
return new ExpiredBackup(hashedBackupId, BackupTier.MEDIA);
} else {
return null;
}
});
}
/**
* Build ddb update statements for the backups table
@@ -257,6 +336,7 @@ public class BackupsDb {
private static class UpdateBuilder {
private final List<String> setStatements = new ArrayList<>();
private final List<String> removeStatements = new ArrayList<>();
private final Map<String, AttributeValue> attrValues = new HashMap<>();
private final Map<String, String> attrNames = new HashMap<>();
@@ -308,6 +388,12 @@ public class BackupsDb {
return this;
}
UpdateBuilder addRemoveExpression(final Map.Entry<String, String> attrName) {
addAttrName(attrName);
removeStatements.add(attrName.getKey());
return this;
}
UpdateBuilder withConditionExpression(final String conditionExpression) {
this.conditionExpression = conditionExpression;
return this;
@@ -369,6 +455,19 @@ public class BackupsDb {
return this;
}
private String updateExpression() {
final StringBuilder sb = new StringBuilder();
if (!setStatements.isEmpty()) {
sb.append("SET ");
sb.append(String.join(",", setStatements));
}
if (!removeStatements.isEmpty()) {
sb.append(" REMOVE ");
sb.append(String.join(",", removeStatements));
}
return sb.toString();
}
/**
* Prepare a non-transactional update
*
@@ -378,7 +477,7 @@ public class BackupsDb {
final UpdateItemRequest.Builder bldr = UpdateItemRequest.builder()
.tableName(tableName)
.key(Map.of(KEY_BACKUP_ID_HASH, AttributeValues.b(hashedBackupId)))
.updateExpression("SET %s".formatted(String.join(",", setStatements)))
.updateExpression(updateExpression())
.expressionAttributeNames(attrNames)
.expressionAttributeValues(attrValues);
if (this.conditionExpression != null) {
@@ -396,7 +495,7 @@ public class BackupsDb {
final Update.Builder bldr = Update.builder()
.tableName(tableName)
.key(Map.of(KEY_BACKUP_ID_HASH, AttributeValues.b(hashedBackupId)))
.updateExpression("SET %s".formatted(String.join(",", setStatements)))
.updateExpression(updateExpression())
.expressionAttributeNames(attrNames)
.expressionAttributeValues(attrValues);
if (this.conditionExpression != null) {

View File

@@ -0,0 +1,7 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.backup;
public record ExpiredBackup(byte[] hashedBackupId, BackupTier backupTierToRemove) {}

View File

@@ -15,9 +15,15 @@ import java.security.cert.CertificateException;
import java.time.Clock;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import org.signal.libsignal.zkgroup.GenericServerSecretParams;
import org.signal.libsignal.zkgroup.InvalidInputException;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.WhisperServerService;
import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentialsGenerator;
import org.whispersystems.textsecuregcm.backup.BackupManager;
import org.whispersystems.textsecuregcm.backup.BackupsDb;
import org.whispersystems.textsecuregcm.backup.Cdn3BackupCredentialGenerator;
import org.whispersystems.textsecuregcm.backup.Cdn3RemoteStorageManager;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.controllers.SecureStorageController;
import org.whispersystems.textsecuregcm.controllers.SecureValueRecovery2Controller;
@@ -61,6 +67,7 @@ record CommandDependencies(
KeysManager keysManager,
FaultTolerantRedisCluster cacheCluster,
ClientResources redisClusterClientResources,
BackupManager backupManager,
DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager) {
static CommandDependencies build(
@@ -105,6 +112,8 @@ record CommandDependencies(
ScheduledExecutorService secureValueRecoveryServiceRetryExecutor = environment.lifecycle()
.scheduledExecutorService(name(name, "secureValueRecoveryServiceRetry-%d")).threads(1).build();
ScheduledExecutorService remoteStorageExecutor = environment.lifecycle()
.scheduledExecutorService(name(name, "remoteStorageRetry-%d")).threads(1).build();
ScheduledExecutorService storageServiceRetryExecutor = environment.lifecycle()
.scheduledExecutorService(name(name, "storageServiceRetry-%d")).threads(1).build();
@@ -185,6 +194,27 @@ record CommandDependencies(
secureStorageClient, secureValueRecovery2Client, clientPresenceManager,
registrationRecoveryPasswordsManager, accountLockExecutor, clientPresenceExecutor,
clock);
final BackupsDb backupsDb =
new BackupsDb(dynamoDbAsyncClient, configuration.getDynamoDbTables().getBackups().getTableName(), clock);
final GenericServerSecretParams backupsGenericZkSecretParams;
try {
backupsGenericZkSecretParams =
new GenericServerSecretParams(configuration.getBackupsZkConfig().serverSecret().value());
} catch (InvalidInputException e) {
throw new IllegalArgumentException(e);
}
final BackupManager backupManager = new BackupManager(
backupsDb,
backupsGenericZkSecretParams,
new Cdn3BackupCredentialGenerator(configuration.getTus()),
new Cdn3RemoteStorageManager(
remoteStorageExecutor,
configuration.getClientCdnConfiguration().getCircuitBreaker(),
configuration.getClientCdnConfiguration().getRetry(),
configuration.getClientCdnConfiguration().getCaCertificates(),
configuration.getCdn3StorageManagerConfiguration()),
configuration.getClientCdnConfiguration().getAttachmentUrls(),
clock);
environment.lifecycle().manage(messagesCache);
environment.lifecycle().manage(clientPresenceManager);
@@ -200,6 +230,7 @@ record CommandDependencies(
keys,
cacheCluster,
redisClusterClientResources,
backupManager,
dynamicConfigurationManager
);
}

View File

@@ -0,0 +1,166 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.workers;
import io.dropwizard.core.Application;
import io.dropwizard.core.cli.Cli;
import io.dropwizard.core.cli.EnvironmentCommand;
import io.dropwizard.core.setup.Environment;
import io.micrometer.core.instrument.Metrics;
import java.time.Clock;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.backup.BackupManager;
import org.whispersystems.textsecuregcm.backup.ExpiredBackup;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.util.logging.UncaughtExceptionHandler;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class RemoveExpiredBackupsCommand extends EnvironmentCommand<WhisperServerConfiguration> {
private final Logger logger = LoggerFactory.getLogger(getClass());
private static final String SEGMENT_COUNT_ARGUMENT = "segments";
private static final String DRY_RUN_ARGUMENT = "dry-run";
private static final String MAX_CONCURRENCY_ARGUMENT = "max-concurrency";
private static final String GRACE_PERIOD_ARGUMENT = "grace-period";
// A backup that has not been refreshed after a grace period is eligible for deletion
private static final Duration DEFAULT_GRACE_PERIOD = Duration.ofDays(60);
private static final int DEFAULT_SEGMENT_COUNT = 1;
private static final int DEFAULT_CONCURRENCY = 16;
private static final String EXPIRED_BACKUPS_COUNTER_NAME = MetricsUtil.name(RemoveExpiredBackupsCommand.class,
"expiredBackups");
private final Clock clock;
public RemoveExpiredBackupsCommand(final Clock clock) {
super(new Application<>() {
@Override
public void run(final WhisperServerConfiguration configuration, final Environment environment) {
}
}, "remove-expired-backups", "Removes backups that have expired");
this.clock = clock;
}
@Override
public void configure(final Subparser subparser) {
super.configure(subparser);
subparser.addArgument("--segments")
.type(Integer.class)
.dest(SEGMENT_COUNT_ARGUMENT)
.required(false)
.setDefault(DEFAULT_SEGMENT_COUNT)
.help("The total number of segments for a DynamoDB scan");
subparser.addArgument("--grace-period")
.type(Long.class)
.dest(GRACE_PERIOD_ARGUMENT)
.required(false)
.setDefault(DEFAULT_GRACE_PERIOD.getSeconds())
.help("The number of seconds after which a backup is eligible for removal");
subparser.addArgument("--max-concurrency")
.type(Integer.class)
.dest(MAX_CONCURRENCY_ARGUMENT)
.required(false)
.setDefault(DEFAULT_CONCURRENCY)
.help("Max concurrency for backup expirations. Each expiration may do multiple cdn operations");
subparser.addArgument("--dry-run")
.type(Boolean.class)
.dest(DRY_RUN_ARGUMENT)
.required(false)
.setDefault(true)
.help("If true, dont actually remove expired backups");
}
@Override
protected void run(final Environment environment, final Namespace namespace,
final WhisperServerConfiguration configuration) throws Exception {
UncaughtExceptionHandler.register();
final CommandDependencies commandDependencies = CommandDependencies.build(getName(), environment, configuration);
MetricsUtil.configureRegistries(configuration, environment, commandDependencies.dynamicConfigurationManager());
final int segments = Objects.requireNonNull(namespace.getInt(SEGMENT_COUNT_ARGUMENT));
final int concurrency = Objects.requireNonNull(namespace.getInt(MAX_CONCURRENCY_ARGUMENT));
final boolean dryRun = namespace.getBoolean(DRY_RUN_ARGUMENT);
final Duration gracePeriod = Duration.ofSeconds(Objects.requireNonNull(namespace.getLong(GRACE_PERIOD_ARGUMENT)));
logger.info("Crawling backups with {} segments and {} processors, grace period {}",
segments,
Runtime.getRuntime().availableProcessors(),
gracePeriod);
try {
environment.lifecycle().getManagedObjects().forEach(managedObject -> {
try {
managedObject.start();
} catch (final Exception e) {
logger.error("Failed to start managed object", e);
throw new RuntimeException(e);
}
});
final AtomicLong backupsExpired = new AtomicLong();
final BackupManager backupManager = commandDependencies.backupManager();
backupManager
.getExpiredBackups(segments, Schedulers.parallel(), clock.instant().plus(gracePeriod))
.flatMap(expiredBackup -> removeExpiredBackup(backupManager, expiredBackup, dryRun), concurrency)
.doOnNext(ignored -> backupsExpired.incrementAndGet())
.then()
.block();
logger.info("Expired {} backups", backupsExpired.get());
} finally {
environment.lifecycle().getManagedObjects().forEach(managedObject -> {
try {
managedObject.stop();
} catch (final Exception e) {
logger.error("Failed to stop managed object", e);
}
});
}
}
private Mono<Void> removeExpiredBackup(
final BackupManager backupManager, final ExpiredBackup expiredBackup,
final boolean dryRun) {
final Mono<Void> mono;
if (dryRun) {
mono = Mono.empty();
} else {
mono = Mono.fromCompletionStage(() ->
backupManager.deleteBackup(expiredBackup.backupTierToRemove(), expiredBackup.hashedBackupId()));
}
return mono
.doOnSuccess(ignored -> Metrics
.counter(EXPIRED_BACKUPS_COUNTER_NAME,
"tier", expiredBackup.backupTierToRemove().name(),
"dryRun", String.valueOf(dryRun))
.increment())
.onErrorResume(throwable -> {
logger.warn("Failed to remove tier {} for backup {}", expiredBackup.backupTierToRemove(),
expiredBackup.hashedBackupId());
return Mono.empty();
});
}
@Override
public void onError(final Cli cli, final Namespace namespace, final Throwable throwable) {
logger.error("Unhandled error", throwable);
}
}