diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 3fc461cf4..b9188b5f7 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -806,7 +806,8 @@ public class WhisperServerService extends Application dynamicConfigurationManager; public BackupManager( final BackupsDb backupsDb, @@ -110,7 +100,8 @@ public class BackupManager { final RemoteStorageManager remoteStorageManager, final ExternalServiceCredentialsGenerator secureValueRecoveryBCredentialsGenerator, final SecureValueRecoveryClient secureValueRecoveryBClient, - final Clock clock) { + final Clock clock, + final DynamicConfigurationManager dynamicConfigurationManager) { this.backupsDb = backupsDb; this.serverSecretParams = serverSecretParams; this.rateLimiters = rateLimiters; @@ -120,6 +111,7 @@ public class BackupManager { this.secureValueRecoveryBClient = secureValueRecoveryBClient; this.clock = clock; this.secureValueRecoveryBCredentialsGenerator = secureValueRecoveryBCredentialsGenerator; + this.dynamicConfigurationManager = dynamicConfigurationManager; } @@ -238,6 +230,9 @@ public class BackupManager { checkBackupLevel(backupUser, BackupLevel.PAID); checkBackupCredentialType(backupUser, BackupCredentialType.MEDIA); + final DynamicBackupConfiguration backupConfiguration = + dynamicConfigurationManager.getConfiguration().getBackupConfiguration(); + return Mono.fromFuture(() -> allowedCopies(backupUser, toCopy)) .flatMapMany(quotaResult -> Flux.concat( @@ -247,7 +242,7 @@ public class BackupManager { Flux.fromIterable(quotaResult.requestsToCopy()) // Update the usage in reasonable chunk sizes to bound how out of sync our claimed and actual usage gets - .buffer(USAGE_CHECKPOINT_COUNT) + .buffer(backupConfiguration.usageCheckpointCount()) .concatMap(copyParameters -> { final long quotaToConsume = copyParameters.stream() .mapToLong(CopyParameters::destinationObjectSize) @@ -265,7 +260,7 @@ public class BackupManager { .fromFuture(this.backupsDb.trackMedia(backupUser, -1, -copyParams.destinationObjectSize())) .thenReturn(copyResult); }), - COPY_CONCURRENCY, 1), + backupConfiguration.copyConcurrency(), 1), // There wasn't enough quota remaining to perform these copies Flux.fromIterable(quotaResult.requestsToReject()) @@ -317,11 +312,14 @@ public class BackupManager { }) .sum(); + final Duration maxQuotaStaleness = + dynamicConfigurationManager.getConfiguration().getBackupConfiguration().maxQuotaStaleness(); + return backupsDb.getMediaUsage(backupUser) .thenComposeAsync(info -> { long remainingQuota = MAX_TOTAL_BACKUP_MEDIA_BYTES - info.usageInfo().bytesUsed(); final boolean canStore = remainingQuota >= totalBytesAdded; - if (canStore || info.lastRecalculationTime().isAfter(clock.instant().minus(MAX_QUOTA_STALENESS))) { + if (canStore || info.lastRecalculationTime().isAfter(clock.instant().minus(maxQuotaStaleness))) { return CompletableFuture.completedFuture(remainingQuota); } @@ -456,6 +454,9 @@ public class BackupManager { public CompletableFuture deleteEntireBackup(final AuthenticatedBackupUser backupUser) { checkBackupLevel(backupUser, BackupLevel.FREE); + final int deletionConcurrency = + dynamicConfigurationManager.getConfiguration().getBackupConfiguration().deletionConcurrency(); + // Clients only include SVRB data with their messages backup-id final CompletableFuture svrbRemoval = switch(backupUser.credentialType()) { case BackupCredentialType.MESSAGES -> secureValueRecoveryBClient.removeData(svrbIdentifier(backupUser)); @@ -467,7 +468,7 @@ public class BackupManager { // If there was already a pending swap, try to delete the cdn objects directly .exceptionallyCompose(ExceptionUtils.exceptionallyHandler(BackupsDb.PendingDeletionException.class, e -> AsyncTimerUtil.record(SYNCHRONOUS_DELETE_TIMER, () -> - deletePrefix(backupUser.backupDir(), DELETION_CONCURRENCY))))); + deletePrefix(backupUser.backupDir(), deletionConcurrency))))); } @@ -482,11 +483,13 @@ public class BackupManager { .withDescription("unsupported media cdn provided") .asRuntimeException(); } + final DynamicBackupConfiguration backupConfiguration = + dynamicConfigurationManager.getConfiguration().getBackupConfiguration(); return Flux.usingWhen( // Gather usage updates into the UsageBatcher so we don't have to update our backup record on every delete - Mono.just(new UsageBatcher()), + Mono.just(new UsageBatcher(backupConfiguration.usageCheckpointCount())), // Deletes the objects, returning their former location. Tracks bytes removed so the quota can be updated on // completion @@ -495,7 +498,7 @@ public class BackupManager { // Delete the objects, allowing DELETION_CONCURRENCY operations out at a time .flatMapSequential( sd -> Mono.fromCompletionStage(remoteStorageManager.delete(cdnMediaPath(backupUser, sd.key()))), - DELETION_CONCURRENCY) + backupConfiguration.deletionConcurrency()) .zipWithIterable(storageDescriptors) // Track how much the remote storage manager indicated was deleted as part of the operation @@ -531,9 +534,14 @@ public class BackupManager { */ private static class UsageBatcher { + private final int usageCheckpointCount; private long runningCountDelta = 0; private long runningBytesDelta = 0; + UsageBatcher(int usageCheckpointCount) { + this.usageCheckpointCount = usageCheckpointCount; + } + record UsageUpdate(long countDelta, long bytesDelta) {} /** @@ -546,7 +554,7 @@ public class BackupManager { boolean update(long bytesDelta) { this.runningCountDelta += Long.signum(bytesDelta); this.runningBytesDelta += bytesDelta; - return Math.abs(runningCountDelta) >= USAGE_CHECKPOINT_COUNT; + return Math.abs(runningCountDelta) >= usageCheckpointCount; } /** diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicBackupConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicBackupConfiguration.java new file mode 100644 index 000000000..2962c942e --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicBackupConfiguration.java @@ -0,0 +1,41 @@ +/* + * Copyright 2025 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.configuration.dynamic; + +import java.time.Duration; + +/** + * + * @param deletionConcurrency How many cdn object deletion requests can be outstanding at a time per backup deletion operation + * @param copyConcurrency How many cdn object copy requests can be outstanding at a time per batch copy-to-backup operation + * @param usageCheckpointCount When doing batch operations, how often persist usage deltas + * @param maxQuotaStaleness The maximum age of a quota estimate that can be used to enforce a quota limit + */ +public record DynamicBackupConfiguration( + Integer deletionConcurrency, + Integer copyConcurrency, + Integer usageCheckpointCount, + Duration maxQuotaStaleness) { + + public DynamicBackupConfiguration { + if (deletionConcurrency == null) { + deletionConcurrency = 10; + } + if (copyConcurrency == null) { + copyConcurrency = 10; + } + if (usageCheckpointCount == null) { + usageCheckpointCount = 10; + } + if (maxQuotaStaleness == null) { + maxQuotaStaleness = Duration.ofSeconds(10); + } + } + + public DynamicBackupConfiguration() { + this(null, null, null, null); + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicConfiguration.java index 79c3b211f..1587c459f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicConfiguration.java @@ -64,6 +64,10 @@ public class DynamicConfiguration { @Valid DynamicRestDeprecationConfiguration restDeprecation = new DynamicRestDeprecationConfiguration(Map.of()); + @JsonProperty + @Valid + private DynamicBackupConfiguration backup = new DynamicBackupConfiguration(); + public Optional getExperimentEnrollmentConfiguration( final String experimentName) { return Optional.ofNullable(experiments.get(experimentName)); @@ -114,4 +118,7 @@ public class DynamicConfiguration { return restDeprecation; } + public DynamicBackupConfiguration getBackupConfiguration() { + return backup; + } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java index 456414aa3..c4a80c959 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java @@ -300,7 +300,8 @@ record CommandDependencies( configuration.getCdn3StorageManagerConfiguration()), secureValueRecoveryBCredentialsGenerator, secureValueRecoveryBClient, - clock); + clock, + dynamicConfigurationManager); final IssuedReceiptsManager issuedReceiptsManager = new IssuedReceiptsManager( configuration.getDynamoDbTables().getIssuedReceipts().getTableName(), diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupManagerTest.java index a613fec6c..e59ae13d0 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/backup/BackupManagerTest.java @@ -73,10 +73,13 @@ import org.whispersystems.textsecuregcm.auth.AuthenticatedBackupUser; import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentials; import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentialsGenerator; import org.whispersystems.textsecuregcm.configuration.SecureValueRecoveryConfiguration; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicBackupConfiguration; +import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException; import org.whispersystems.textsecuregcm.limits.RateLimiter; import org.whispersystems.textsecuregcm.limits.RateLimiters; import org.whispersystems.textsecuregcm.securevaluerecovery.SecureValueRecoveryClient; +import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager; import org.whispersystems.textsecuregcm.storage.DynamoDbExtension; import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema; import org.whispersystems.textsecuregcm.util.AttributeValues; @@ -109,6 +112,8 @@ public class BackupManagerTest { private final RemoteStorageManager remoteStorageManager = mock(RemoteStorageManager.class); private final byte[] backupKey = TestRandomUtil.nextBytes(32); private final UUID aci = UUID.randomUUID(); + private final DynamicBackupConfiguration backupConfiguration = new DynamicBackupConfiguration( + 3, 4, 5, Duration.ofSeconds(30)); private static final SecureValueRecoveryConfiguration CFG = new SecureValueRecoveryConfiguration( @@ -139,6 +144,12 @@ public class BackupManagerTest { DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(), DynamoDbExtensionSchema.Tables.BACKUPS.tableName(), testClock); + @SuppressWarnings("unchecked") final DynamicConfigurationManager dynamicConfigurationManager = + mock(DynamicConfigurationManager.class); + final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class); + when(dynamicConfiguration.getBackupConfiguration()).thenReturn(backupConfiguration); + when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration); + this.backupManager = new BackupManager( backupsDb, backupAuthTestUtil.params, @@ -148,7 +159,8 @@ public class BackupManagerTest { remoteStorageManager, svrbCredentialGenerator, svrbClient, - testClock); + testClock, + dynamicConfigurationManager); } @ParameterizedTest @@ -490,7 +502,7 @@ public class BackupManagerTest { .map(source -> new CopyParameters(3, source, 100, COPY_ENCRYPTION_PARAM, TestRandomUtil.nextBytes(15))) .toList(); - final int slowIndex = BackupManager.USAGE_CHECKPOINT_COUNT - 1; + final int slowIndex = backupConfiguration.usageCheckpointCount() - 1; final CompletableFuture slow = new CompletableFuture<>(); when(remoteStorageManager.copy(eq(3), anyString(), eq(100), any(), anyString())) .thenReturn(CompletableFuture.completedFuture(null)); @@ -511,11 +523,11 @@ public class BackupManagerTest { final long bytesPerObject = COPY_ENCRYPTION_PARAM.outputSize(100); assertThat(backupsDb.getMediaUsage(backupUser).join().usageInfo()).isIn( new UsageInfo( - bytesPerObject * BackupManager.USAGE_CHECKPOINT_COUNT, - BackupManager.USAGE_CHECKPOINT_COUNT), + bytesPerObject * backupConfiguration.usageCheckpointCount(), + backupConfiguration.usageCheckpointCount()), new UsageInfo( - 2 * bytesPerObject * BackupManager.USAGE_CHECKPOINT_COUNT, - 2 * BackupManager.USAGE_CHECKPOINT_COUNT)); + 2 * bytesPerObject * backupConfiguration.usageCheckpointCount(), + 2L * backupConfiguration.usageCheckpointCount())); // We should still be waiting since we have a slow delete assertThat(future).isNotDone(); @@ -590,7 +602,7 @@ public class BackupManagerTest { backupsDb.setMediaUsage(backupUser, new UsageInfo(BackupManager.MAX_TOTAL_BACKUP_MEDIA_BYTES, 1000)).join(); // check still within staleness bound (t=0 + 1 day - 1 sec) testClock.pin(Instant.ofEpochSecond(0) - .plus(BackupManager.MAX_QUOTA_STALENESS) + .plus(backupConfiguration.maxQuotaStaleness()) .minus(Duration.ofSeconds(1))); // Try to copy @@ -611,7 +623,7 @@ public class BackupManagerTest { // set the backupsDb to be totally out of quota at t=0 testClock.pin(Instant.ofEpochSecond(0)); backupsDb.setMediaUsage(backupUser, new UsageInfo(BackupManager.MAX_TOTAL_BACKUP_MEDIA_BYTES, 1000)).join(); - testClock.pin(Instant.ofEpochSecond(0).plus(BackupManager.MAX_QUOTA_STALENESS)); + testClock.pin(Instant.ofEpochSecond(0).plus(backupConfiguration.maxQuotaStaleness())); // Should recalculate quota and copy can succeed assertThat(copy(backupUser).outcome()).isEqualTo(CopyResult.Outcome.SUCCESS); @@ -619,7 +631,7 @@ public class BackupManagerTest { // backupsDb should have the new value final BackupsDb.TimestampedUsageInfo info = backupsDb.getMediaUsage(backupUser).join(); assertThat(info.lastRecalculationTime()) - .isEqualTo(Instant.ofEpochSecond(0).plus(BackupManager.MAX_QUOTA_STALENESS)); + .isEqualTo(Instant.ofEpochSecond(0).plus(backupConfiguration.maxQuotaStaleness())); assertThat(info.usageInfo().bytesUsed()).isEqualTo(BackupManager.MAX_TOTAL_BACKUP_MEDIA_BYTES); assertThat(info.usageInfo().numObjects()).isEqualTo(1001); } @@ -643,7 +655,7 @@ public class BackupManagerTest { backupsDb.setMediaUsage(backupUser, new UsageInfo(originalRemainingSpace, 1000)).join(); if (doesReaclc) { - testClock.pin(Instant.ofEpochSecond(0).plus(BackupManager.MAX_QUOTA_STALENESS).plus(Duration.ofSeconds(1))); + testClock.pin(Instant.ofEpochSecond(0).plus(backupConfiguration.maxQuotaStaleness()).plus(Duration.ofSeconds(1))); when(remoteStorageManager.calculateBytesUsed(eq(backupMediaPrefix))) .thenReturn(CompletableFuture.completedFuture(new UsageInfo(afterRecalcRemainingSpace, 1000))); } @@ -806,7 +818,7 @@ public class BackupManagerTest { final String slowMediaKey = "%s/%s/%s".formatted( backupUser.backupDir(), backupUser.mediaDir(), - BackupManager.encodeMediaIdForCdn(mediaIds.get(BackupManager.USAGE_CHECKPOINT_COUNT + 3))); + BackupManager.encodeMediaIdForCdn(mediaIds.get(backupConfiguration.usageCheckpointCount() + 3))); when(remoteStorageManager.delete(anyString())).thenReturn(CompletableFuture.completedFuture(2L)); when(remoteStorageManager.delete(slowMediaKey)).thenReturn(slowFuture); @@ -819,21 +831,21 @@ public class BackupManagerTest { .toList()); final ArrayBlockingQueue sds = new ArrayBlockingQueue<>(100); final CompletableFuture future = flux.doOnNext(sds::add).then().toFuture(); - for (int i = 0; i < BackupManager.USAGE_CHECKPOINT_COUNT; i++) { + for (int i = 0; i < backupConfiguration.usageCheckpointCount(); i++) { sds.poll(1, TimeUnit.SECONDS); } assertThat(backupsDb.getMediaUsage(backupUser).join().usageInfo()) .isEqualTo(new UsageInfo( - 200 - (2 * BackupManager.USAGE_CHECKPOINT_COUNT), - 100 - BackupManager.USAGE_CHECKPOINT_COUNT)); + 200 - (2L * backupConfiguration.usageCheckpointCount()), + 100 - backupConfiguration.usageCheckpointCount())); // We should still be waiting since we have a slow delete assertThat(future).isNotDone(); // But we should checkpoint the usage periodically assertThat(backupsDb.getMediaUsage(backupUser).join().usageInfo()) .isEqualTo(new UsageInfo( - 200 - (2 * BackupManager.USAGE_CHECKPOINT_COUNT), - 100 - BackupManager.USAGE_CHECKPOINT_COUNT)); + 200 - (2L * backupConfiguration.usageCheckpointCount()), + 100 - backupConfiguration.usageCheckpointCount())); slowFuture.complete(2L); future.join();