Add archive listing

This commit is contained in:
ravi-signal
2024-01-08 13:54:57 -06:00
committed by GitHub
parent 460dc6224c
commit b6ecfc7131
21 changed files with 798 additions and 258 deletions

View File

@@ -5,15 +5,19 @@
package org.whispersystems.textsecuregcm.backup;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.Status;
import io.micrometer.core.instrument.Metrics;
import java.net.URI;
import java.time.Clock;
import java.time.Duration;
import java.util.Base64;
import java.util.HexFormat;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.signal.libsignal.protocol.InvalidKeyException;
@@ -28,14 +32,20 @@ import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
public class BackupManager {
private static final Logger logger = LoggerFactory.getLogger(BackupManager.class);
static final String MEDIA_DIRECTORY_NAME = "media";
static final String MESSAGE_BACKUP_NAME = "messageBackup";
private static final long MAX_TOTAL_BACKUP_MEDIA_BYTES = 1024L * 1024L * 1024L * 50L;
private static final long MAX_MEDIA_OBJECT_SIZE = 1024L * 1024L * 101L;
static final long MAX_TOTAL_BACKUP_MEDIA_BYTES = 1024L * 1024L * 1024L * 50L;
static final long MAX_MEDIA_OBJECT_SIZE = 1024L * 1024L * 101L;
// If the last media usage recalculation is over MAX_QUOTA_STALENESS, force a recalculation before quota enforcement.
static final Duration MAX_QUOTA_STALENESS = Duration.ofDays(1);
private static final String ZK_AUTHN_COUNTER_NAME = MetricsUtil.name(BackupManager.class, "authentication");
private static final String ZK_AUTHZ_FAILURE_COUNTER_NAME = MetricsUtil.name(BackupManager.class,
"authorizationFailure");
private static final String USAGE_RECALCULATION_COUNTER_NAME = MetricsUtil.name(BackupManager.class,
"usageRecalculation");
private static final String SUCCESS_TAG_NAME = "success";
private static final String FAILURE_REASON_TAG_NAME = "reason";
@@ -175,22 +185,41 @@ public class BackupManager {
.withDescription("credential does not support storing media")
.asRuntimeException();
}
return backupsDb.describeBackup(backupUser)
.thenApply(info -> info.mediaUsedSpace()
.filter(usedSpace -> MAX_TOTAL_BACKUP_MEDIA_BYTES - usedSpace >= mediaLength)
.isPresent());
return backupsDb.getMediaUsage(backupUser)
.thenComposeAsync(info -> {
final boolean canStore = MAX_TOTAL_BACKUP_MEDIA_BYTES - info.usageInfo().bytesUsed() >= mediaLength;
if (canStore || info.lastRecalculationTime().isAfter(clock.instant().minus(MAX_QUOTA_STALENESS))) {
return CompletableFuture.completedFuture(canStore);
}
// The user is out of quota, and we have not recently recalculated the user's usage. Double check by doing a
// hard recalculation before actually forbidding the user from storing additional media.
final String mediaPrefix = "%s/%s/".formatted(encodeBackupIdForCdn(backupUser), MEDIA_DIRECTORY_NAME);
return this.remoteStorageManager.calculateBytesUsed(mediaPrefix)
.thenCompose(usage -> backupsDb
.setMediaUsage(backupUser, usage)
.thenApply(ignored -> usage.bytesUsed()))
.whenComplete((newUsage, throwable) -> {
boolean usageChanged = throwable == null && !newUsage.equals(info.usageInfo());
Metrics.counter(USAGE_RECALCULATION_COUNTER_NAME, "usageChanged", String.valueOf(usageChanged))
.increment();
})
.thenApply(usedSpace -> MAX_TOTAL_BACKUP_MEDIA_BYTES - usedSpace >= mediaLength);
});
}
public record StorageDescriptor(int cdn, byte[] key) {}
public record StorageDescriptorWithLength(int cdn, byte[] key, long length) {}
/**
* Copy an encrypted object to the backup cdn, adding a layer of encryption
* <p>
* Implementation notes: <p> This method guarantees that any object that gets successfully copied to the backup cdn
* will also have an entry for the user in the database. <p>
* will also be deducted from the user's quota. </p>
* <p>
* However, the converse isn't true; there may be entries in the database that have not made it to the cdn. On list,
* these entries are checked against the cdn and removed.
* However, the converse isn't true. It's possible we may charge the user for media they failed to copy. As a result,
* the quota may be over reported and it should be recalculated before taking quota enforcement actions.
*
* @return A stage that completes successfully with location of the twice-encrypted object on the backup cdn. The
* returned CompletionStage can be completed exceptionally with the following exceptions.
@@ -221,21 +250,27 @@ public class BackupManager {
final MessageBackupUploadDescriptor dst = cdn3BackupCredentialGenerator.generateUpload(
encodeBackupIdForCdn(backupUser),
encodeForCdn(destinationMediaId));
"%s/%s".formatted(MEDIA_DIRECTORY_NAME, encodeForCdn(destinationMediaId)));
final int destinationLength = encryptionParameters.outputSize(sourceLength);
final URI sourceUri = attachmentReadUri(sourceCdn, sourceKey);
return this.backupsDb
// Write the ddb updates before actually updating backing storage
.trackMedia(backupUser, destinationMediaId, sourceLength)
// copy the objects. On a failure, make a best-effort attempt to reverse the ddb transaction. If cleanup fails
// the client may be left with some cleanup to do if they don't eventually upload the media id.
.thenCompose(ignored -> remoteStorageManager
// actually perform the copy
.copy(attachmentReadUri(sourceCdn, sourceKey), sourceLength, encryptionParameters, dst)
// best effort: on failure, untrack the copied media
.exceptionallyCompose(copyError -> backupsDb.untrackMedia(backupUser, destinationMediaId, sourceLength)
.thenCompose(ignoredSuccess -> CompletableFuture.failedFuture(copyError))))
.trackMedia(backupUser, destinationLength)
// Actually copy the objects. If the copy fails, our estimated quota usage may not be exact
.thenComposeAsync(ignored -> remoteStorageManager.copy(sourceUri, sourceLength, encryptionParameters, dst))
.exceptionallyCompose(throwable -> {
final Throwable unwrapped = ExceptionUtils.unwrap(throwable);
if (!(unwrapped instanceof SourceObjectNotFoundException) && !(unwrapped instanceof InvalidLengthException)) {
throw ExceptionUtils.wrap(unwrapped);
}
// In cases where we know the copy fails without writing anything, we can try to restore the user's quota
return this.backupsDb.trackMedia(backupUser, -destinationLength).whenComplete((ignored, ignoredEx) -> {
throw ExceptionUtils.wrap(unwrapped);
});
})
// indicates where the backup was stored
.thenApply(ignore -> new StorageDescriptor(dst.cdn(), destinationMediaId));
@@ -268,12 +303,55 @@ public class BackupManager {
throw Status.PERMISSION_DENIED
.withDescription("credential does not support read auth operation")
.asRuntimeException();
}
final String encodedBackupId = encodeBackupIdForCdn(backupUser);
return cdn3BackupCredentialGenerator.readHeaders(encodedBackupId);
}
/**
* List of media stored for a particular backup id
*
* @param media A page of media entries
* @param cursor If set, can be passed back to a subsequent list request to resume listing from the previous point
*/
public record ListMediaResult(List<StorageDescriptorWithLength> media, Optional<String> cursor) {}
/**
* List the media stored by the backupUser
*
* @param backupUser An already ZK authenticated backup user
* @param cursor A cursor returned by a previous call that can be used to resume listing
* @param limit The maximum number of list results to return
* @return A {@link ListMediaResult}
*/
public CompletionStage<ListMediaResult> list(
final AuthenticatedBackupUser backupUser,
final Optional<String> cursor,
final int limit) {
if (backupUser.backupTier().compareTo(BackupTier.MESSAGES) < 0) {
Metrics.counter(ZK_AUTHZ_FAILURE_COUNTER_NAME).increment();
throw Status.PERMISSION_DENIED
.withDescription("credential does not support list operation")
.asRuntimeException();
}
final String mediaPrefix = "%s/%s/".formatted(MEDIA_DIRECTORY_NAME, encodeBackupIdForCdn(backupUser));
return remoteStorageManager.list(mediaPrefix, cursor, limit)
.thenApply(result ->
new ListMediaResult(
result
.objects()
.stream()
.map(entry -> new StorageDescriptorWithLength(
remoteStorageManager.cdnNumber(),
decodeFromCdn(entry.key()),
entry.length()
))
.toList(),
result.cursor()
));
}
/**
* Authenticate the ZK anonymous backup credential's presentation
* <p>
@@ -369,7 +447,8 @@ public class BackupManager {
});
}
private static String encodeBackupIdForCdn(final AuthenticatedBackupUser backupUser) {
@VisibleForTesting
static String encodeBackupIdForCdn(final AuthenticatedBackupUser backupUser) {
return encodeForCdn(BackupsDb.hashedBackupId(backupUser.backupId()));
}
@@ -377,4 +456,8 @@ public class BackupManager {
return Base64.getUrlEncoder().encodeToString(bytes);
}
private static byte[] decodeFromCdn(final String base64) {
return Base64.getUrlDecoder().decode(base64);
}
}

View File

@@ -1,6 +1,5 @@
package org.whispersystems.textsecuregcm.backup;
import java.net.http.HttpRequest;
import java.nio.ByteBuffer;
import java.security.InvalidAlgorithmParameterException;
import java.security.InvalidKeyException;

View File

@@ -4,6 +4,7 @@ import io.grpc.Status;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -11,7 +12,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import org.signal.libsignal.protocol.ecc.ECPublicKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -22,31 +22,24 @@ import org.whispersystems.textsecuregcm.util.Util;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.CancellationReason;
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
import software.amazon.awssdk.services.dynamodb.model.Delete;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.Put;
import software.amazon.awssdk.services.dynamodb.model.ReturnValuesOnConditionCheckFailure;
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
import software.amazon.awssdk.services.dynamodb.model.TransactionCanceledException;
import software.amazon.awssdk.services.dynamodb.model.Update;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
/**
* Tracks backup metadata in a persistent store.
*
* <p>
* It's assumed that the caller has already validated that the backupUser being operated on has valid credentials and
* possesses the appropriate {@link BackupTier} to perform the current operation.
*/
public class BackupsDb {
private static final Logger logger = LoggerFactory.getLogger(BackupsDb.class);
static final int BACKUP_CDN = 3;
private final DynamoDbAsyncClient dynamoClient;
private final String backupTableName;
private final String backupMediaTableName;
private final Clock clock;
// The backups table
@@ -68,31 +61,25 @@ public class BackupsDb {
public static final String ATTR_MEDIA_COUNT = "MC";
// N: The cdn number where the message backup is stored
public static final String ATTR_CDN = "CDN";
// The stored media table (hashedBackupId, mediaId, cdn, objectLength)
// B: 15-byte mediaId
public static final String KEY_MEDIA_ID = "M";
// N: The length of the encrypted media object
public static final String ATTR_LENGTH = "L";
// N: Time in seconds since epoch of last backup media usage recalculation. This timestamp is updated whenever we
// recalculate the up-to-date bytes used by querying the cdn(s) directly.
public static final String ATTR_MEDIA_USAGE_LAST_RECALCULATION = "MBTS";
public BackupsDb(
final DynamoDbAsyncClient dynamoClient,
final String backupTableName,
final String backupMediaTableName,
final Clock clock) {
this.dynamoClient = dynamoClient;
this.backupTableName = backupTableName;
this.backupMediaTableName = backupMediaTableName;
this.clock = clock;
}
/**
* Set the public key associated with a backupId.
*
* @param authenticatedBackupId The backup-id bytes that should be associated with the provided public key
* @param authenticatedBackupId The backup-id bytes that should be associated with the provided public key
* @param authenticatedBackupTier The backup tier
* @param publicKey The public key to associate with the backup id
* @param publicKey The public key to associate with the backup id
* @return A stage that completes when the public key has been set. If the backup-id already has a set public key that
* does not match, the stage will be completed exceptionally with a {@link PublicKeyConflictException}
*/
@@ -136,103 +123,27 @@ public class BackupsDb {
/**
* Add media to the backup media table and update the quota in the backup table
*
* @param backupUser The
* @param mediaId The mediaId to add
* @param mediaLength The length of the media before encryption (the length of the source media)
* @return A stage that completes successfully once the tables are updated. If the media with the provided id has
* previously been tracked with a different length, the stage will complete exceptionally with an
* {@link InvalidLengthException}
*/
CompletableFuture<Void> trackMedia(
final AuthenticatedBackupUser backupUser,
final byte[] mediaId,
final int mediaLength) {
final byte[] hashedBackupId = hashedBackupId(backupUser);
return dynamoClient
.transactWriteItems(TransactWriteItemsRequest.builder().transactItems(
// Add the media to the media table
TransactWriteItem.builder().put(Put.builder()
.tableName(backupMediaTableName)
.returnValuesOnConditionCheckFailure(ReturnValuesOnConditionCheckFailure.ALL_OLD)
.item(Map.of(
KEY_BACKUP_ID_HASH, AttributeValues.b(hashedBackupId),
KEY_MEDIA_ID, AttributeValues.b(mediaId),
ATTR_CDN, AttributeValues.n(BACKUP_CDN),
ATTR_LENGTH, AttributeValues.n(mediaLength)))
.conditionExpression("attribute_not_exists(#mediaId)")
.expressionAttributeNames(Map.of("#mediaId", KEY_MEDIA_ID))
.build()).build(),
// Update the media quota and TTL
TransactWriteItem.builder().update(
UpdateBuilder.forUser(backupTableName, backupUser)
.setRefreshTimes(clock)
.incrementMediaBytes(mediaLength)
.incrementMediaCount(1)
.transactItemBuilder()
.build()).build()).build())
.exceptionally(throwable -> {
if (ExceptionUtils.unwrap(throwable) instanceof TransactionCanceledException txCancelled) {
final long oldItemLength = conditionCheckFailed(txCancelled, 0)
.flatMap(item -> Optional.ofNullable(item.get(ATTR_LENGTH)))
.map(attr -> Long.parseLong(attr.n()))
.orElseThrow(() -> ExceptionUtils.wrap(throwable));
if (oldItemLength != mediaLength) {
throw new CompletionException(
new InvalidLengthException("Previously tried to copy media with a different length. "
+ "Provided " + mediaLength + " was " + oldItemLength));
}
// The client already "paid" for this media, can let them through
return null;
} else {
// rethrow original exception
throw ExceptionUtils.wrap(throwable);
}
})
.thenRun(Util.NOOP);
}
/**
* Remove media from backup media table and update the quota in the backup table
* Update the quota in the backup table
*
* @param backupUser The backup user
* @param mediaId The mediaId to add
* @param mediaLength The length of the media before encryption (the length of the source media)
* @return A stage that completes successfully once the tables are updated
* @param mediaLength The length of the media after encryption. A negative length implies the media is being removed
* @return A stage that completes successfully once the table are updated.
*/
CompletableFuture<Void> untrackMedia(
final AuthenticatedBackupUser backupUser,
final byte[] mediaId,
final int mediaLength) {
final byte[] hashedBackupId = hashedBackupId(backupUser);
return dynamoClient.transactWriteItems(TransactWriteItemsRequest.builder().transactItems(
TransactWriteItem.builder().delete(Delete.builder()
.tableName(backupMediaTableName)
.returnValuesOnConditionCheckFailure(ReturnValuesOnConditionCheckFailure.ALL_OLD)
.key(Map.of(
KEY_BACKUP_ID_HASH, AttributeValues.b(hashedBackupId),
KEY_MEDIA_ID, AttributeValues.b(mediaId)
))
.conditionExpression("#length = :length")
.expressionAttributeNames(Map.of("#length", ATTR_LENGTH))
.expressionAttributeValues(Map.of(":length", AttributeValues.n(mediaLength)))
.build()).build(),
// Don't update TTLs, since we're just cleaning up media
TransactWriteItem.builder().update(UpdateBuilder.forUser(backupTableName, backupUser)
.incrementMediaBytes(-mediaLength)
.incrementMediaCount(-1)
.transactItemBuilder().build()).build()).build())
.exceptionally(error -> {
logger.warn("failed cleanup after failed copy operation", error);
return null;
})
CompletableFuture<Void> trackMedia(final AuthenticatedBackupUser backupUser, final int mediaLength) {
final Instant now = clock.instant();
return dynamoClient
.updateItem(
// Update the media quota and TTL
UpdateBuilder.forUser(backupTableName, backupUser)
.setRefreshTimes(now)
.incrementMediaBytes(mediaLength)
.incrementMediaCount(Integer.signum(mediaLength))
.updateItemBuilder()
.build())
.thenRun(Util.NOOP);
}
/**
* Update the last update timestamps for the backupId in the presentation
*
@@ -249,6 +160,7 @@ public class BackupsDb {
/**
* Track that a backup will be stored for the user
*
* @param backupUser an already authorized backup user
*/
CompletableFuture<Void> addMessageBackup(final AuthenticatedBackupUser backupUser) {
@@ -276,8 +188,8 @@ public class BackupsDb {
return dynamoClient.getItem(GetItemRequest.builder()
.tableName(backupTableName)
.key(Map.of(KEY_BACKUP_ID_HASH, AttributeValues.b(hashedBackupId(backupUser))))
.projectionExpression("#cdn,#bytesUsed")
.expressionAttributeNames(Map.of("#cdn", ATTR_CDN, "#bytesUsed", ATTR_MEDIA_BYTES_USED))
.projectionExpression("#cdn,#mediaBytesUsed")
.expressionAttributeNames(Map.of("#cdn", ATTR_CDN, "#mediaBytesUsed", ATTR_MEDIA_BYTES_USED))
.consistentRead(true)
.build())
.thenApply(response -> {
@@ -297,6 +209,46 @@ public class BackupsDb {
});
}
public record TimestampedUsageInfo(UsageInfo usageInfo, Instant lastRecalculationTime) {}
CompletableFuture<TimestampedUsageInfo> getMediaUsage(final AuthenticatedBackupUser backupUser) {
return dynamoClient.getItem(GetItemRequest.builder()
.tableName(backupTableName)
.key(Map.of(KEY_BACKUP_ID_HASH, AttributeValues.b(hashedBackupId(backupUser))))
.projectionExpression("#mediaBytesUsed,#mediaCount,#usageRecalc")
.expressionAttributeNames(Map.of(
"#mediaBytesUsed", ATTR_MEDIA_BYTES_USED,
"#mediaCount", ATTR_MEDIA_COUNT,
"#usageRecalc", ATTR_MEDIA_USAGE_LAST_RECALCULATION))
.consistentRead(true)
.build())
.thenApply(response -> {
final long mediaUsed = AttributeValues.getLong(response.item(), ATTR_MEDIA_BYTES_USED, 0L);
final long mediaCount = AttributeValues.getLong(response.item(), ATTR_MEDIA_COUNT, 0L);
final long recalcSeconds = AttributeValues.getLong(response.item(), ATTR_MEDIA_USAGE_LAST_RECALCULATION, 0L);
return new TimestampedUsageInfo(new UsageInfo(mediaUsed, mediaCount), Instant.ofEpochSecond(recalcSeconds));
});
}
CompletableFuture<Void> setMediaUsage(final AuthenticatedBackupUser backupUser, UsageInfo usageInfo) {
return dynamoClient.updateItem(
UpdateBuilder.forUser(backupTableName, backupUser)
.addSetExpression("#mediaBytesUsed = :mediaBytesUsed",
Map.entry("#mediaBytesUsed", ATTR_MEDIA_BYTES_USED),
Map.entry(":mediaBytesUsed", AttributeValues.n(usageInfo.bytesUsed())))
.addSetExpression("#mediaCount = :mediaCount",
Map.entry("#mediaCount", ATTR_MEDIA_COUNT),
Map.entry(":mediaCount", AttributeValues.n(usageInfo.numObjects())))
.addSetExpression("#mediaRecalc = :mediaRecalc",
Map.entry("#mediaRecalc", ATTR_MEDIA_USAGE_LAST_RECALCULATION),
Map.entry(":mediaRecalc", AttributeValues.n(clock.instant().getEpochSecond())))
.updateItemBuilder()
.build())
.thenRun(Util.NOOP);
}
/**
* Build ddb update statements for the backups table
@@ -396,19 +348,22 @@ public class BackupsDb {
* Set the lastRefresh time as part of the update
* <p>
* This always updates lastRefreshTime, and updates lastMediaRefreshTime if the backup user has the appropriate
* tier
* tier.
*/
UpdateBuilder setRefreshTimes(final Clock clock) {
final long refreshTimeSecs = clock.instant().getEpochSecond();
return this.setRefreshTimes(clock.instant());
}
UpdateBuilder setRefreshTimes(final Instant refreshTime) {
addSetExpression("#lastRefreshTime = :lastRefreshTime",
Map.entry("#lastRefreshTime", ATTR_LAST_REFRESH),
Map.entry(":lastRefreshTime", AttributeValues.n(refreshTimeSecs)));
Map.entry(":lastRefreshTime", AttributeValues.n(refreshTime.getEpochSecond())));
if (backupTier.compareTo(BackupTier.MEDIA) >= 0) {
// update the media time if we have the appropriate tier
addSetExpression("#lastMediaRefreshTime = :lastMediaRefreshTime",
Map.entry("#lastMediaRefreshTime", ATTR_LAST_MEDIA_REFRESH),
Map.entry(":lastMediaRefreshTime", AttributeValues.n(refreshTimeSecs)));
Map.entry(":lastMediaRefreshTime", AttributeValues.n(refreshTime.getEpochSecond())));
}
return this;
}
@@ -462,28 +417,4 @@ public class BackupsDb {
throw new AssertionError(e);
}
}
/**
* Check if a DynamoDb error indicates a condition check failed error, and return the value of the item failed to
* update.
*
* @param e The error returned by {@link DynamoDbAsyncClient#transactWriteItems} attempt
* @param itemIndex The index of the item in the transaction that had a condition expression
* @return The remote value of the item that failed to update, or empty if the error was not a condition check failure
*/
private static Optional<Map<String, AttributeValue>> conditionCheckFailed(TransactionCanceledException e,
int itemIndex) {
if (!e.hasCancellationReasons()) {
return Optional.empty();
}
if (e.cancellationReasons().size() < itemIndex + 1) {
return Optional.empty();
}
final CancellationReason reason = e.cancellationReasons().get(itemIndex);
if (!"ConditionalCheckFailed".equals(reason.code()) || !reason.hasItem()) {
return Optional.empty();
}
return Optional.of(reason.item());
}
}

View File

@@ -1,41 +1,94 @@
package org.whispersystems.textsecuregcm.backup;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.security.cert.CertificateException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import javax.ws.rs.core.Response;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.Cdn3StorageManagerConfiguration;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
import org.whispersystems.textsecuregcm.http.FaultTolerantHttpClient;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
import org.whispersystems.textsecuregcm.util.HttpUtils;
import org.whispersystems.textsecuregcm.util.SystemMapper;
public class Cdn3RemoteStorageManager implements RemoteStorageManager {
private final FaultTolerantHttpClient httpClient;
private static final Logger logger = LoggerFactory.getLogger(Cdn3RemoteStorageManager.class);
private final FaultTolerantHttpClient cdnHttpClient;
private final FaultTolerantHttpClient storageManagerHttpClient;
private final String storageManagerBaseUrl;
private final String clientId;
private final String clientSecret;
static final String CLIENT_ID_HEADER = "CF-Access-Client-Id";
static final String CLIENT_SECRET_HEADER = "CF-Access-Client-Secret";
private static final String STORAGE_MANAGER_STATUS_COUNTER_NAME = MetricsUtil.name(Cdn3RemoteStorageManager.class,
"storageManagerStatus");
private static final String STORAGE_MANAGER_TIMER_NAME = MetricsUtil.name(Cdn3RemoteStorageManager.class,
"storageManager");
private static final String OPERATION_TAG_NAME = "op";
private static final String STATUS_TAG_NAME = "status";
public Cdn3RemoteStorageManager(
final ScheduledExecutorService retryExecutor,
final CircuitBreakerConfiguration circuitBreakerConfiguration,
final RetryConfiguration retryConfiguration,
final List<String> caCertificates) throws CertificateException {
this.httpClient = FaultTolerantHttpClient.newBuilder()
.withName("cdn3-remote-storage")
final List<String> cdnCaCertificates,
final Cdn3StorageManagerConfiguration configuration) throws CertificateException {
// strip trailing "/" for easier URI construction
this.storageManagerBaseUrl = StringUtils.removeEnd(configuration.baseUri(), "/");
this.clientId = configuration.clientId();
this.clientSecret = configuration.clientSecret();
// Client used to read/write to cdn
this.cdnHttpClient = FaultTolerantHttpClient.newBuilder()
.withName("cdn-client")
.withCircuitBreaker(circuitBreakerConfiguration)
.withExecutor(Executors.newCachedThreadPool())
.withRetryExecutor(retryExecutor)
.withRetry(retryConfiguration)
.withConnectTimeout(Duration.ofSeconds(10))
.withVersion(HttpClient.Version.HTTP_2)
.withTrustedServerCertificates(cdnCaCertificates.toArray(new String[0]))
.build();
// Client used for calls to storage-manager
// storage-manager has an external CA so uses a different client
this.storageManagerHttpClient = FaultTolerantHttpClient.newBuilder()
.withName("cdn3-storage-manager")
.withCircuitBreaker(circuitBreakerConfiguration)
.withExecutor(Executors.newCachedThreadPool())
.withRetryExecutor(retryExecutor)
.withRetry(retryConfiguration)
.withConnectTimeout(Duration.ofSeconds(10))
.withVersion(HttpClient.Version.HTTP_2)
.withTrustedServerCertificates(caCertificates.toArray(new String[0]))
.build();
}
@@ -55,10 +108,10 @@ public class Cdn3RemoteStorageManager implements RemoteStorageManager {
throw new IllegalArgumentException("Cdn3RemoteStorageManager can only copy to cdn3");
}
final Timer.Sample sample = Timer.start();
final BackupMediaEncrypter encrypter = new BackupMediaEncrypter(encryptionParameters);
final HttpRequest request = HttpRequest.newBuilder().GET().uri(sourceUri).build();
return httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofPublisher()).thenCompose(response -> {
return cdnHttpClient.sendAsync(request, HttpResponse.BodyHandlers.ofPublisher()).thenCompose(response -> {
if (response.statusCode() == Response.Status.NOT_FOUND.getStatusCode()) {
throw new CompletionException(new SourceObjectNotFoundException());
} else if (response.statusCode() != Response.Status.OK.getStatusCode()) {
@@ -90,13 +143,122 @@ public class Cdn3RemoteStorageManager implements RemoteStorageManager {
.POST(encryptedBody)
.build();
return httpClient.sendAsync(put, HttpResponse.BodyHandlers.discarding());
return cdnHttpClient.sendAsync(put, HttpResponse.BodyHandlers.discarding());
})
.thenAccept(response -> {
if (response.statusCode() != Response.Status.CREATED.getStatusCode() &&
response.statusCode() != Response.Status.OK.getStatusCode()) {
throw new CompletionException(new IOException("Failed to copy object: " + response.statusCode()));
}
});
})
.whenComplete((ignored, ignoredException) ->
sample.stop(Metrics.timer(STORAGE_MANAGER_TIMER_NAME, OPERATION_TAG_NAME, "copy")));
}
@Override
public CompletionStage<ListResult> list(
final String prefix,
final Optional<String> cursor,
final long limit) {
final Timer.Sample sample = Timer.start();
final Map<String, String> queryParams = new HashMap<>();
queryParams.put("prefix", prefix);
queryParams.put("limit", Long.toString(limit));
cursor.ifPresent(s -> queryParams.put("cursor", cursor.get()));
final HttpRequest request = HttpRequest.newBuilder().GET()
.uri(URI.create("%s/%s/%s".formatted(
storageManagerBaseUrl,
Cdn3BackupCredentialGenerator.CDN_PATH,
HttpUtils.queryParamString(queryParams.entrySet()))))
.header(CLIENT_ID_HEADER, clientId)
.header(CLIENT_SECRET_HEADER, clientSecret)
.build();
return this.storageManagerHttpClient.sendAsync(request, HttpResponse.BodyHandlers.ofInputStream())
.thenApply(response -> {
Metrics.counter(STORAGE_MANAGER_STATUS_COUNTER_NAME,
OPERATION_TAG_NAME, "list",
STATUS_TAG_NAME, Integer.toString(response.statusCode()))
.increment();
try {
return parseListResponse(response, prefix);
} catch (IOException e) {
throw ExceptionUtils.wrap(e);
}
})
.whenComplete((ignored, ignoredException) ->
sample.stop(Metrics.timer(STORAGE_MANAGER_TIMER_NAME, OPERATION_TAG_NAME, "list")));
}
/**
* Serialized list response from storage manager
*/
record Cdn3ListResponse(@NotNull List<Entry> objects, @Nullable String cursor) {
record Entry(@NotNull String key, @NotNull long size) {}
}
private static ListResult parseListResponse(final HttpResponse<InputStream> httpListResponse, final String prefix)
throws IOException {
if (!HttpUtils.isSuccessfulResponse(httpListResponse.statusCode())) {
throw new IOException("Failed to list objects: " + httpListResponse.statusCode());
}
final Cdn3ListResponse result = SystemMapper.jsonMapper()
.readValue(httpListResponse.body(), Cdn3ListResponse.class);
final List<ListResult.Entry> objects = new ArrayList<>(result.objects.size());
for (Cdn3ListResponse.Entry entry : result.objects) {
if (!entry.key().startsWith(prefix)) {
logger.error("unexpected listing result from cdn3 - entry {} does not contain requested prefix {}",
entry.key(), prefix);
throw new IOException("prefix listing returned unexpected result");
}
objects.add(new ListResult.Entry(entry.key().substring(prefix.length()), entry.size()));
}
return new ListResult(objects, Optional.ofNullable(result.cursor));
}
/**
* Serialized usage response from storage manager
*/
record UsageResponse(@NotNull long numObjects, @NotNull long bytesUsed) {}
@Override
public CompletionStage<UsageInfo> calculateBytesUsed(final String prefix) {
final Timer.Sample sample = Timer.start();
final HttpRequest request = HttpRequest.newBuilder().GET()
.uri(URI.create("%s/usage%s".formatted(
storageManagerBaseUrl,
HttpUtils.queryParamString(Map.of("prefix", prefix).entrySet()))))
.header(CLIENT_ID_HEADER, clientId)
.header(CLIENT_SECRET_HEADER, clientSecret)
.build();
return this.storageManagerHttpClient.sendAsync(request, HttpResponse.BodyHandlers.ofInputStream())
.thenApply(response -> {
Metrics.counter(STORAGE_MANAGER_STATUS_COUNTER_NAME,
OPERATION_TAG_NAME, "usage",
STATUS_TAG_NAME, Integer.toString(response.statusCode()))
.increment();
try {
return parseUsageResponse(response);
} catch (IOException e) {
throw ExceptionUtils.wrap(e);
}
})
.whenComplete((ignored, ignoredException) ->
sample.stop(Metrics.timer(STORAGE_MANAGER_TIMER_NAME, OPERATION_TAG_NAME, "usage")));
}
private static UsageInfo parseUsageResponse(final HttpResponse<InputStream> httpUsageResponse) throws IOException {
if (!HttpUtils.isSuccessfulResponse(httpUsageResponse.statusCode())) {
throw new IOException("Failed to retrieve usage: " + httpUsageResponse.statusCode());
}
final UsageResponse response = SystemMapper.jsonMapper().readValue(httpUsageResponse.body(), UsageResponse.class);
return new UsageInfo(response.bytesUsed(), response.numObjects);
}
}

View File

@@ -14,4 +14,11 @@ public record MediaEncryptionParameters(
new SecretKeySpec(macKey, "HmacSHA256"),
new IvParameterSpec(iv));
}
public int outputSize(final int inputSize) {
// AES-256 has 16-byte block size, and always adds a block if the plaintext is a multiple of the block size
final int numBlocks = (inputSize + 16) / 16;
// IV + AES-256 encrypted data + HmacSHA256
return this.iv().getIV().length + (numBlocks * 16) + 32;
}
}

View File

@@ -1,6 +1,8 @@
package org.whispersystems.textsecuregcm.backup;
import java.net.URI;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
/**
@@ -35,4 +37,41 @@ public interface RemoteStorageManager {
int expectedSourceLength,
MediaEncryptionParameters encryptionParameters,
MessageBackupUploadDescriptor uploadDescriptor);
/**
* Result of a {@link #list} operation
*
* @param objects An {@link Entry} for each object returned by the list request
* @param cursor An opaque string that can be used to resume listing from where a previous request left off, empty if
* the request reached the end of the list of matching objects.
*/
record ListResult(List<Entry> objects, Optional<String> cursor) {
/**
* An entry representing a remote stored object under a prefix
*
* @param key The name of the object with the prefix removed
* @param length The length of the object in bytes
*/
record Entry(String key, long length) {}
}
/**
* List objects on the remote cdn.
*
* @param prefix The prefix of the objects to list
* @param cursor The cursor returned by a previous call to list, or empty if starting from the first object with the
* provided prefix
* @param limit The maximum number of items to return in the list
* @return A {@link ListResult} of objects that match the prefix.
*/
CompletionStage<ListResult> list(final String prefix, final Optional<String> cursor, final long limit);
/**
* Calculate the total number of bytes stored by objects with the provided prefix
*
* @param prefix The prefix of the objects to sum
* @return The number of bytes used
*/
CompletionStage<UsageInfo> calculateBytesUsed(final String prefix);
}

View File

@@ -0,0 +1,7 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.backup;
public record UsageInfo(long bytesUsed, long numObjects) {}