Make backup batch operation concurrency configurable

This commit is contained in:
Ravi Khadiwala
2025-09-08 13:25:27 -05:00
committed by ravi-signal
parent efde8a31f9
commit 8c2d738924
6 changed files with 110 additions and 40 deletions

View File

@@ -806,7 +806,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
cdn3RemoteStorageManager,
svrbCredentialsGenerator,
secureValueRecoveryBClient,
clock);
clock,
dynamicConfigurationManager);
final AppleDeviceChecks appleDeviceChecks = new AppleDeviceChecks(
dynamoDbClient,

View File

@@ -40,10 +40,13 @@ import org.whispersystems.textsecuregcm.attachments.TusAttachmentGenerator;
import org.whispersystems.textsecuregcm.auth.AuthenticatedBackupUser;
import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentials;
import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentialsGenerator;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicBackupConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.limits.RateLimiters;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
import org.whispersystems.textsecuregcm.securevaluerecovery.SecureValueRecoveryClient;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.util.AsyncTimerUtil;
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
import org.whispersystems.textsecuregcm.util.Pair;
@@ -61,20 +64,6 @@ public class BackupManager {
public static final long MAX_MESSAGE_BACKUP_OBJECT_SIZE = DataSize.mebibytes(101).toBytes();
public static final long MAX_MEDIA_OBJECT_SIZE = DataSize.mebibytes(101).toBytes();
// If the last media usage recalculation is over MAX_QUOTA_STALENESS, force a recalculation before quota enforcement.
static final Duration MAX_QUOTA_STALENESS = Duration.ofDays(1);
// How many cdn object deletion requests can be outstanding at a time per backup deletion operation
private static final int DELETION_CONCURRENCY = 10;
// How many cdn object copy requests can be outstanding at a time per batch copy-to-backup operation
private static final int COPY_CONCURRENCY = 10;
// How often we should persist the current usage
@VisibleForTesting
static int USAGE_CHECKPOINT_COUNT = 10;
private static final String ZK_AUTHN_COUNTER_NAME = MetricsUtil.name(BackupManager.class, "authentication");
private static final String ZK_AUTHZ_FAILURE_COUNTER_NAME = MetricsUtil.name(BackupManager.class,
"authorizationFailure");
@@ -100,6 +89,7 @@ public class BackupManager {
private final ExternalServiceCredentialsGenerator secureValueRecoveryBCredentialsGenerator;
private final SecureValueRecoveryClient secureValueRecoveryBClient;
private final Clock clock;
private final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager;
public BackupManager(
final BackupsDb backupsDb,
@@ -110,7 +100,8 @@ public class BackupManager {
final RemoteStorageManager remoteStorageManager,
final ExternalServiceCredentialsGenerator secureValueRecoveryBCredentialsGenerator,
final SecureValueRecoveryClient secureValueRecoveryBClient,
final Clock clock) {
final Clock clock,
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager) {
this.backupsDb = backupsDb;
this.serverSecretParams = serverSecretParams;
this.rateLimiters = rateLimiters;
@@ -120,6 +111,7 @@ public class BackupManager {
this.secureValueRecoveryBClient = secureValueRecoveryBClient;
this.clock = clock;
this.secureValueRecoveryBCredentialsGenerator = secureValueRecoveryBCredentialsGenerator;
this.dynamicConfigurationManager = dynamicConfigurationManager;
}
@@ -238,6 +230,9 @@ public class BackupManager {
checkBackupLevel(backupUser, BackupLevel.PAID);
checkBackupCredentialType(backupUser, BackupCredentialType.MEDIA);
final DynamicBackupConfiguration backupConfiguration =
dynamicConfigurationManager.getConfiguration().getBackupConfiguration();
return Mono.fromFuture(() -> allowedCopies(backupUser, toCopy))
.flatMapMany(quotaResult -> Flux.concat(
@@ -247,7 +242,7 @@ public class BackupManager {
Flux.fromIterable(quotaResult.requestsToCopy())
// Update the usage in reasonable chunk sizes to bound how out of sync our claimed and actual usage gets
.buffer(USAGE_CHECKPOINT_COUNT)
.buffer(backupConfiguration.usageCheckpointCount())
.concatMap(copyParameters -> {
final long quotaToConsume = copyParameters.stream()
.mapToLong(CopyParameters::destinationObjectSize)
@@ -265,7 +260,7 @@ public class BackupManager {
.fromFuture(this.backupsDb.trackMedia(backupUser, -1, -copyParams.destinationObjectSize()))
.thenReturn(copyResult);
}),
COPY_CONCURRENCY, 1),
backupConfiguration.copyConcurrency(), 1),
// There wasn't enough quota remaining to perform these copies
Flux.fromIterable(quotaResult.requestsToReject())
@@ -317,11 +312,14 @@ public class BackupManager {
})
.sum();
final Duration maxQuotaStaleness =
dynamicConfigurationManager.getConfiguration().getBackupConfiguration().maxQuotaStaleness();
return backupsDb.getMediaUsage(backupUser)
.thenComposeAsync(info -> {
long remainingQuota = MAX_TOTAL_BACKUP_MEDIA_BYTES - info.usageInfo().bytesUsed();
final boolean canStore = remainingQuota >= totalBytesAdded;
if (canStore || info.lastRecalculationTime().isAfter(clock.instant().minus(MAX_QUOTA_STALENESS))) {
if (canStore || info.lastRecalculationTime().isAfter(clock.instant().minus(maxQuotaStaleness))) {
return CompletableFuture.completedFuture(remainingQuota);
}
@@ -456,6 +454,9 @@ public class BackupManager {
public CompletableFuture<Void> deleteEntireBackup(final AuthenticatedBackupUser backupUser) {
checkBackupLevel(backupUser, BackupLevel.FREE);
final int deletionConcurrency =
dynamicConfigurationManager.getConfiguration().getBackupConfiguration().deletionConcurrency();
// Clients only include SVRB data with their messages backup-id
final CompletableFuture<Void> svrbRemoval = switch(backupUser.credentialType()) {
case BackupCredentialType.MESSAGES -> secureValueRecoveryBClient.removeData(svrbIdentifier(backupUser));
@@ -467,7 +468,7 @@ public class BackupManager {
// If there was already a pending swap, try to delete the cdn objects directly
.exceptionallyCompose(ExceptionUtils.exceptionallyHandler(BackupsDb.PendingDeletionException.class, e ->
AsyncTimerUtil.record(SYNCHRONOUS_DELETE_TIMER, () ->
deletePrefix(backupUser.backupDir(), DELETION_CONCURRENCY)))));
deletePrefix(backupUser.backupDir(), deletionConcurrency)))));
}
@@ -482,11 +483,13 @@ public class BackupManager {
.withDescription("unsupported media cdn provided")
.asRuntimeException();
}
final DynamicBackupConfiguration backupConfiguration =
dynamicConfigurationManager.getConfiguration().getBackupConfiguration();
return Flux.usingWhen(
// Gather usage updates into the UsageBatcher so we don't have to update our backup record on every delete
Mono.just(new UsageBatcher()),
Mono.just(new UsageBatcher(backupConfiguration.usageCheckpointCount())),
// Deletes the objects, returning their former location. Tracks bytes removed so the quota can be updated on
// completion
@@ -495,7 +498,7 @@ public class BackupManager {
// Delete the objects, allowing DELETION_CONCURRENCY operations out at a time
.flatMapSequential(
sd -> Mono.fromCompletionStage(remoteStorageManager.delete(cdnMediaPath(backupUser, sd.key()))),
DELETION_CONCURRENCY)
backupConfiguration.deletionConcurrency())
.zipWithIterable(storageDescriptors)
// Track how much the remote storage manager indicated was deleted as part of the operation
@@ -531,9 +534,14 @@ public class BackupManager {
*/
private static class UsageBatcher {
private final int usageCheckpointCount;
private long runningCountDelta = 0;
private long runningBytesDelta = 0;
UsageBatcher(int usageCheckpointCount) {
this.usageCheckpointCount = usageCheckpointCount;
}
record UsageUpdate(long countDelta, long bytesDelta) {}
/**
@@ -546,7 +554,7 @@ public class BackupManager {
boolean update(long bytesDelta) {
this.runningCountDelta += Long.signum(bytesDelta);
this.runningBytesDelta += bytesDelta;
return Math.abs(runningCountDelta) >= USAGE_CHECKPOINT_COUNT;
return Math.abs(runningCountDelta) >= usageCheckpointCount;
}
/**

View File

@@ -0,0 +1,41 @@
/*
* Copyright 2025 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.configuration.dynamic;
import java.time.Duration;
/**
*
* @param deletionConcurrency How many cdn object deletion requests can be outstanding at a time per backup deletion operation
* @param copyConcurrency How many cdn object copy requests can be outstanding at a time per batch copy-to-backup operation
* @param usageCheckpointCount When doing batch operations, how often persist usage deltas
* @param maxQuotaStaleness The maximum age of a quota estimate that can be used to enforce a quota limit
*/
public record DynamicBackupConfiguration(
Integer deletionConcurrency,
Integer copyConcurrency,
Integer usageCheckpointCount,
Duration maxQuotaStaleness) {
public DynamicBackupConfiguration {
if (deletionConcurrency == null) {
deletionConcurrency = 10;
}
if (copyConcurrency == null) {
copyConcurrency = 10;
}
if (usageCheckpointCount == null) {
usageCheckpointCount = 10;
}
if (maxQuotaStaleness == null) {
maxQuotaStaleness = Duration.ofSeconds(10);
}
}
public DynamicBackupConfiguration() {
this(null, null, null, null);
}
}

View File

@@ -64,6 +64,10 @@ public class DynamicConfiguration {
@Valid
DynamicRestDeprecationConfiguration restDeprecation = new DynamicRestDeprecationConfiguration(Map.of());
@JsonProperty
@Valid
private DynamicBackupConfiguration backup = new DynamicBackupConfiguration();
public Optional<DynamicExperimentEnrollmentConfiguration> getExperimentEnrollmentConfiguration(
final String experimentName) {
return Optional.ofNullable(experiments.get(experimentName));
@@ -114,4 +118,7 @@ public class DynamicConfiguration {
return restDeprecation;
}
public DynamicBackupConfiguration getBackupConfiguration() {
return backup;
}
}

View File

@@ -300,7 +300,8 @@ record CommandDependencies(
configuration.getCdn3StorageManagerConfiguration()),
secureValueRecoveryBCredentialsGenerator,
secureValueRecoveryBClient,
clock);
clock,
dynamicConfigurationManager);
final IssuedReceiptsManager issuedReceiptsManager = new IssuedReceiptsManager(
configuration.getDynamoDbTables().getIssuedReceipts().getTableName(),