Make max total backup media configurable

This commit is contained in:
Ravi Khadiwala
2025-09-15 10:29:10 -05:00
committed by ravi-signal
parent e50dcd185d
commit 35ffb208e3
9 changed files with 115 additions and 74 deletions

View File

@@ -1142,7 +1142,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
zkReceiptOperations, issuedReceiptsManager);
commonControllers.add(new SubscriptionController(clock, config.getSubscription(), config.getOneTimeDonations(),
subscriptionManager, stripeManager, braintreeManager, googlePlayBillingManager, appleAppStoreManager,
profileBadgeConverter, bankMandateTranslator));
profileBadgeConverter, bankMandateTranslator, dynamicConfigurationManager));
commonControllers.add(new OneTimeDonationController(clock, config.getOneTimeDonations(), stripeManager, braintreeManager,
zkReceiptOperations, issuedReceiptsManager, oneTimeDonationsManager));
}

View File

@@ -17,6 +17,7 @@ import java.security.SecureRandom;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Base64;
import java.util.HexFormat;
import java.util.List;
@@ -60,7 +61,6 @@ import reactor.core.scheduler.Scheduler;
public class BackupManager {
static final String MESSAGE_BACKUP_NAME = "messageBackup";
public static final long MAX_TOTAL_BACKUP_MEDIA_BYTES = DataSize.gibibytes(100).toBytes();
public static final long MAX_MESSAGE_BACKUP_OBJECT_SIZE = DataSize.mebibytes(101).toBytes();
public static final long MAX_MEDIA_OBJECT_SIZE = DataSize.mebibytes(101).toBytes();
@@ -74,6 +74,10 @@ public class BackupManager {
private static final Timer SYNCHRONOUS_DELETE_TIMER =
Metrics.timer(MetricsUtil.name(BackupManager.class, "synchronousDelete"));
private static final String NUM_OBJECTS_SUMMARY_NAME = MetricsUtil.name(BackupsDb.class, "numObjects");
private static final String BYTES_USED_SUMMARY_NAME = MetricsUtil.name(BackupsDb.class, "bytesUsed");
private static final String BACKUPS_COUNTER_NAME = MetricsUtil.name(BackupsDb.class, "backups");
private static final String SUCCESS_TAG_NAME = "success";
private static final String FAILURE_REASON_TAG_NAME = "reason";
@@ -161,11 +165,42 @@ public class BackupManager {
final AuthenticatedBackupUser backupUser) {
checkBackupLevel(backupUser, BackupLevel.FREE);
checkBackupCredentialType(backupUser, BackupCredentialType.MESSAGES);
final Instant today = clock.instant().truncatedTo(ChronoUnit.DAYS);
final long maxTotalMediaSize =
dynamicConfigurationManager.getConfiguration().getBackupConfiguration().maxTotalMediaSize();
// this could race with concurrent updates, but the only effect would be last-writer-wins on the timestamp
return backupsDb
.addMessageBackup(backupUser)
.thenApply(result -> cdn3BackupCredentialGenerator.generateUpload(cdnMessageBackupName(backupUser)));
.thenApply(storedBackupAttributes -> {
final Instant previousRefreshTime = storedBackupAttributes.lastRefresh();
// Only publish a metric update once per day
if (previousRefreshTime.isBefore(today)) {
final Tags tags = Tags.of(
UserAgentTagUtil.getPlatformTag(backupUser.userAgent()),
Tag.of("tier", backupUser.backupLevel().name()));
DistributionSummary.builder(NUM_OBJECTS_SUMMARY_NAME)
.tags(tags)
.publishPercentileHistogram()
.register(Metrics.globalRegistry)
.record(storedBackupAttributes.numObjects());
DistributionSummary.builder(BYTES_USED_SUMMARY_NAME)
.tags(tags)
.publishPercentileHistogram()
.register(Metrics.globalRegistry)
.record(storedBackupAttributes.bytesUsed());
// Report that the backup is out of quota if it cannot store a max size media object
final boolean quotaExhausted = storedBackupAttributes.bytesUsed() >=
(maxTotalMediaSize - BackupManager.MAX_MEDIA_OBJECT_SIZE);
Metrics.counter(BACKUPS_COUNTER_NAME,
tags.and("quotaExhausted", String.valueOf(quotaExhausted)))
.increment();
}
return cdn3BackupCredentialGenerator.generateUpload(cdnMessageBackupName(backupUser));
});
}
public CompletableFuture<BackupUploadDescriptor> createTemporaryAttachmentUploadDescriptor(
@@ -312,12 +347,14 @@ public class BackupManager {
})
.sum();
final Duration maxQuotaStaleness =
dynamicConfigurationManager.getConfiguration().getBackupConfiguration().maxQuotaStaleness();
final DynamicBackupConfiguration backupConfiguration =
dynamicConfigurationManager.getConfiguration().getBackupConfiguration();
final Duration maxQuotaStaleness = backupConfiguration.maxQuotaStaleness();
final long maxTotalMediaSize = backupConfiguration.maxTotalMediaSize();
return backupsDb.getMediaUsage(backupUser)
.thenComposeAsync(info -> {
long remainingQuota = MAX_TOTAL_BACKUP_MEDIA_BYTES - info.usageInfo().bytesUsed();
long remainingQuota = maxTotalMediaSize - info.usageInfo().bytesUsed();
final boolean canStore = remainingQuota >= totalBytesAdded;
if (canStore || info.lastRecalculationTime().isAfter(clock.instant().minus(maxQuotaStaleness))) {
return CompletableFuture.completedFuture(remainingQuota);
@@ -336,7 +373,7 @@ public class BackupManager {
Tag.of("usageChanged", String.valueOf(usageChanged))))
.increment();
})
.thenApply(newUsage -> MAX_TOTAL_BACKUP_MEDIA_BYTES - newUsage.bytesUsed());
.thenApply(newUsage -> maxTotalMediaSize - newUsage.bytesUsed());
})
.thenApply(remainingQuota -> {
// Figure out how many of the requested objects fit in the remaining quota

View File

@@ -87,10 +87,6 @@ public class BackupsDb {
private final SecureRandom secureRandom;
private static final String NUM_OBJECTS_SUMMARY_NAME = MetricsUtil.name(BackupsDb.class, "numObjects");
private static final String BYTES_USED_SUMMARY_NAME = MetricsUtil.name(BackupsDb.class, "bytesUsed");
private static final String BACKUPS_COUNTER_NAME = MetricsUtil.name(BackupsDb.class, "backups");
// The backups table
// B: 16 bytes that identifies the backup
@@ -257,12 +253,14 @@ public class BackupsDb {
.thenRun(Util.NOOP);
}
/**
* Track that a backup will be stored for the user
*
* @param backupUser an already authorized backup user
* @return A future that completes with the attributes of the backup before the update
*/
CompletableFuture<Void> addMessageBackup(final AuthenticatedBackupUser backupUser) {
CompletableFuture<StoredBackupAttributes> addMessageBackup(final AuthenticatedBackupUser backupUser) {
final Instant today = clock.instant().truncatedTo(ChronoUnit.DAYS);
// this could race with concurrent updates, but the only effect would be last-writer-wins on the timestamp
return dynamoClient.updateItem(
@@ -272,40 +270,7 @@ public class BackupsDb {
.updateItemBuilder()
.returnValues(ReturnValue.ALL_OLD)
.build())
.thenAccept(updateItemResponse ->
updateMetricsAfterUpload(backupUser, today, updateItemResponse.attributes()));
}
private void updateMetricsAfterUpload(final AuthenticatedBackupUser backupUser, final Instant today, final Map<String, AttributeValue> item) {
final Instant previousRefreshTime = Instant.ofEpochSecond(
AttributeValues.getLong(item, ATTR_LAST_REFRESH, 0L));
// Only publish a metric update once per day
if (previousRefreshTime.isBefore(today)) {
final long mediaCount = AttributeValues.getLong(item, ATTR_MEDIA_COUNT, 0L);
final long bytesUsed = AttributeValues.getLong(item, ATTR_MEDIA_BYTES_USED, 0L);
final Tags tags = Tags.of(
UserAgentTagUtil.getPlatformTag(backupUser.userAgent()),
Tag.of("tier", backupUser.backupLevel().name()));
DistributionSummary.builder(NUM_OBJECTS_SUMMARY_NAME)
.tags(tags)
.publishPercentileHistogram()
.register(Metrics.globalRegistry)
.record(mediaCount);
DistributionSummary.builder(BYTES_USED_SUMMARY_NAME)
.tags(tags)
.publishPercentileHistogram()
.register(Metrics.globalRegistry)
.record(bytesUsed);
// Report that the backup is out of quota if it cannot store a max size media object
final boolean quotaExhausted = bytesUsed >=
(BackupManager.MAX_TOTAL_BACKUP_MEDIA_BYTES - BackupManager.MAX_MEDIA_OBJECT_SIZE);
Metrics.counter(BACKUPS_COUNTER_NAME,
tags.and("quotaExhausted", String.valueOf(quotaExhausted)))
.increment();
}
.thenApply(updateItemResponse -> fromItem(updateItemResponse.attributes()));
}
/**
@@ -520,14 +485,18 @@ public class BackupsDb {
// Don't use the SDK's item publisher, works around https://github.com/aws/aws-sdk-java-v2/issues/6411
.concatMap(page -> Flux.fromIterable(page.items()))
.filter(item -> item.containsKey(KEY_BACKUP_ID_HASH))
.map(item -> new StoredBackupAttributes(
AttributeValues.getByteArray(item, KEY_BACKUP_ID_HASH, null),
AttributeValues.getString(item, ATTR_BACKUP_DIR, null),
AttributeValues.getString(item, ATTR_MEDIA_DIR, null),
Instant.ofEpochSecond(AttributeValues.getLong(item, ATTR_LAST_REFRESH, 0L)),
Instant.ofEpochSecond(AttributeValues.getLong(item, ATTR_LAST_MEDIA_REFRESH, 0L)),
AttributeValues.getLong(item, ATTR_MEDIA_BYTES_USED, 0L),
AttributeValues.getLong(item, ATTR_MEDIA_COUNT, 0L)));
.map(BackupsDb::fromItem);
}
private static StoredBackupAttributes fromItem(Map<String, AttributeValue> item) {
return new StoredBackupAttributes(
AttributeValues.getByteArray(item, KEY_BACKUP_ID_HASH, null),
AttributeValues.getString(item, ATTR_BACKUP_DIR, null),
AttributeValues.getString(item, ATTR_MEDIA_DIR, null),
Instant.ofEpochSecond(AttributeValues.getLong(item, ATTR_LAST_REFRESH, 0L)),
Instant.ofEpochSecond(AttributeValues.getLong(item, ATTR_LAST_MEDIA_REFRESH, 0L)),
AttributeValues.getLong(item, ATTR_MEDIA_BYTES_USED, 0L),
AttributeValues.getLong(item, ATTR_MEDIA_COUNT, 0L));
}
Flux<ExpiredBackup> getExpiredBackups(final int segments, final Scheduler scheduler, final Instant purgeTime) {

View File

@@ -5,6 +5,8 @@
package org.whispersystems.textsecuregcm.configuration.dynamic;
import io.dropwizard.util.DataSize;
import jakarta.validation.constraints.NotNull;
import java.time.Duration;
/**
@@ -13,12 +15,14 @@ import java.time.Duration;
* @param copyConcurrency How many cdn object copy requests can be outstanding at a time per batch copy-to-backup operation
* @param usageCheckpointCount When doing batch operations, how often persist usage deltas
* @param maxQuotaStaleness The maximum age of a quota estimate that can be used to enforce a quota limit
* @param maxTotalMediaSize The number of media bytes a paid-tier user may store
*/
public record DynamicBackupConfiguration(
Integer deletionConcurrency,
Integer copyConcurrency,
Integer usageCheckpointCount,
Duration maxQuotaStaleness) {
@NotNull Integer deletionConcurrency,
@NotNull Integer copyConcurrency,
@NotNull Integer usageCheckpointCount,
@NotNull Duration maxQuotaStaleness,
@NotNull Long maxTotalMediaSize) {
public DynamicBackupConfiguration {
if (deletionConcurrency == null) {
@@ -33,9 +37,12 @@ public record DynamicBackupConfiguration(
if (maxQuotaStaleness == null) {
maxQuotaStaleness = Duration.ofSeconds(10);
}
if (maxTotalMediaSize == null) {
maxTotalMediaSize = DataSize.gibibytes(100).toBytes();
}
}
public DynamicBackupConfiguration() {
this(null, null, null, null);
this(null, null, null, null, null);
}
}

View File

@@ -65,11 +65,13 @@ import org.whispersystems.textsecuregcm.configuration.OneTimeDonationConfigurati
import org.whispersystems.textsecuregcm.configuration.OneTimeDonationCurrencyConfiguration;
import org.whispersystems.textsecuregcm.configuration.SubscriptionConfiguration;
import org.whispersystems.textsecuregcm.configuration.SubscriptionLevelConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.entities.Badge;
import org.whispersystems.textsecuregcm.entities.PurchasableBadge;
import org.whispersystems.textsecuregcm.mappers.SubscriptionExceptionMapper;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.storage.PaymentTime;
import org.whispersystems.textsecuregcm.storage.SubscriberCredentials;
import org.whispersystems.textsecuregcm.subscriptions.SubscriptionException;
@@ -109,6 +111,7 @@ public class SubscriptionController {
private final AppleAppStoreManager appleAppStoreManager;
private final BadgeTranslator badgeTranslator;
private final BankMandateTranslator bankMandateTranslator;
private final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager;
static final String RECEIPT_ISSUED_COUNTER_NAME = MetricsUtil.name(SubscriptionController.class, "receiptIssued");
static final String PROCESSOR_TAG_NAME = "processor";
static final String TYPE_TAG_NAME = "type";
@@ -124,7 +127,8 @@ public class SubscriptionController {
@Nonnull GooglePlayBillingManager googlePlayBillingManager,
@Nonnull AppleAppStoreManager appleAppStoreManager,
@Nonnull BadgeTranslator badgeTranslator,
@Nonnull BankMandateTranslator bankMandateTranslator) {
@Nonnull BankMandateTranslator bankMandateTranslator,
@NotNull DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager) {
this.subscriptionManager = subscriptionManager;
this.clock = Objects.requireNonNull(clock);
this.subscriptionConfiguration = Objects.requireNonNull(subscriptionConfiguration);
@@ -135,6 +139,7 @@ public class SubscriptionController {
this.appleAppStoreManager = appleAppStoreManager;
this.badgeTranslator = Objects.requireNonNull(badgeTranslator);
this.bankMandateTranslator = Objects.requireNonNull(bankMandateTranslator);
this.dynamicConfigurationManager = dynamicConfigurationManager;
}
private Map<String, CurrencyConfiguration> buildCurrencyConfiguration() {
@@ -207,12 +212,14 @@ public class SubscriptionController {
giftBadge,
oneTimeDonationConfiguration.gift().expiration())));
final long maxTotalBackupMediaBytes =
dynamicConfigurationManager.getConfiguration().getBackupConfiguration().maxTotalMediaSize();
final Map<String, BackupLevelConfiguration> backupLevels = subscriptionConfiguration.getBackupLevels()
.entrySet().stream()
.collect(Collectors.toMap(
e -> String.valueOf(e.getKey()),
e -> new BackupLevelConfiguration(
BackupManager.MAX_TOTAL_BACKUP_MEDIA_BYTES,
maxTotalBackupMediaBytes,
e.getValue().playProductId(),
e.getValue().mediaTtl().toDays())));

View File

@@ -8,6 +8,7 @@ package org.whispersystems.textsecuregcm.workers;
import io.dropwizard.core.Application;
import io.dropwizard.core.setup.Environment;
import java.time.Clock;
import java.util.Base64;
import java.util.Optional;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
@@ -75,4 +76,17 @@ public class ClearIssuedReceiptRedemptionsCommand extends AbstractCommandWithDep
throw new RuntimeException(ex);
}
}
public static void main(String[] args) throws Exception {
final String subscriberId = "7ywqmymkSMBkBi9v06Iy4AN8DiN_lg8gHXM8TSpO0Z0";
final SubscriberCredentials creds = SubscriberCredentials
.process(Optional.empty(), subscriberId, Clock.systemUTC());
System.out.println(Base64.getEncoder().encodeToString(creds.subscriberUser()));
final String pc = "AWN1c19TV3hDUEhlWDBldzB0UA==";
final byte[] bc = Base64.getDecoder().decode(pc);
System.out.println(bc[0]);
System.out.println(new String(bc, 1, bc.length - 1));
}
}