Delete avatars in ProfilesManager#deleteAll

This commit is contained in:
Chris Eager
2025-05-22 12:35:07 -05:00
committed by Chris Eager
parent 8491d18413
commit c1a66e0418
7 changed files with 132 additions and 53 deletions

View File

@@ -394,6 +394,12 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
final DynamoDbClient dynamoDbClient = config.getDynamoDbClientConfiguration()
.buildSyncClient(awsCredentialsProvider, new MicrometerAwsSdkMetricPublisher(awsSdkMetricsExecutor, "dynamoDbSync"));
final AwsCredentialsProvider cdnCredentialsProvider = config.getCdnConfiguration().credentials().build();
final S3AsyncClient asyncCdnS3Client = S3AsyncClient.builder()
.credentialsProvider(cdnCredentialsProvider)
.region(Region.of(config.getCdnConfiguration().region()))
.build();
BlockingQueue<Runnable> messageDeletionQueue = new LinkedBlockingQueue<>();
Metrics.gaugeCollectionSize(name(getClass(), "messageDeletionQueueSize"), Collections.emptyList(),
messageDeletionQueue);
@@ -500,8 +506,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
.scheduledExecutorService(name(getClass(), "remoteStorageRetry-%d")).threads(1).build();
ScheduledExecutorService registrationIdentityTokenRefreshExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "registrationIdentityTokenRefresh-%d")).threads(1).build();
ScheduledExecutorService recurringConfigSyncExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "configSync-%d")).threads(1).build();
Scheduler messageDeliveryScheduler = Schedulers.fromExecutorService(
ExecutorServiceMetrics.monitor(Metrics.globalRegistry,
@@ -610,7 +614,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
SecureStorageClient secureStorageClient = new SecureStorageClient(storageCredentialsGenerator,
storageServiceExecutor, storageServiceRetryExecutor, config.getSecureStorageServiceConfiguration());
DisconnectionRequestManager disconnectionRequestManager = new DisconnectionRequestManager(pubsubClient, disconnectionRequestListenerExecutor);
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster, asyncCdnS3Client, config.getCdnConfiguration().bucket());
MessagesCache messagesCache = new MessagesCache(messagesCluster, messageDeliveryScheduler,
messageDeletionAsyncExecutor, clock);
ClientReleaseManager clientReleaseManager = new ClientReleaseManager(clientReleases,
@@ -745,17 +749,11 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
environment.lifecycle().manage(virtualThreadPinEventMonitor);
environment.lifecycle().manage(accountsManager);
AwsCredentialsProvider cdnCredentialsProvider = config.getCdnConfiguration().credentials().build();
S3Client cdnS3Client = S3Client.builder()
final S3Client cdnS3Client = S3Client.builder()
.credentialsProvider(cdnCredentialsProvider)
.region(Region.of(config.getCdnConfiguration().region()))
.httpClientBuilder(AwsCrtHttpClient.builder())
.build();
S3AsyncClient asyncCdnS3Client = S3AsyncClient.builder()
.credentialsProvider(cdnCredentialsProvider)
.region(Region.of(config.getCdnConfiguration().region()))
.build();
final GcsAttachmentGenerator gcsAttachmentGenerator = new GcsAttachmentGenerator(
config.getGcpAttachmentsConfiguration().domain(),

View File

@@ -261,7 +261,12 @@ public class Profiles {
return AttributeValues.extractByteArray(attributeValue, PARSE_BYTE_ARRAY_COUNTER_NAME);
}
public CompletableFuture<Void> deleteAll(final UUID uuid) {
/**
* Deletes all profile versions for the given UUID
*
* @return a list of avatar URLs to be deleted
*/
public CompletableFuture<List<String>> deleteAll(final UUID uuid) {
final Timer.Sample sample = Timer.start();
final AttributeValue uuidAttributeValue = AttributeValues.fromUUID(uuid);
@@ -271,7 +276,7 @@ public class Profiles {
.keyConditionExpression("#uuid = :uuid")
.expressionAttributeNames(Map.of("#uuid", KEY_ACCOUNT_UUID))
.expressionAttributeValues(Map.of(":uuid", uuidAttributeValue))
.projectionExpression(ATTR_VERSION)
.projectionExpression(String.join(", ", ATTR_VERSION, ATTR_AVATAR))
.consistentRead(true)
.build())
.items())
@@ -280,8 +285,9 @@ public class Profiles {
.key(Map.of(
KEY_ACCOUNT_UUID, uuidAttributeValue,
ATTR_VERSION, item.get(ATTR_VERSION)))
.build())), MAX_CONCURRENCY)
.then()
.build()))
.flatMap(ignored -> Mono.justOrEmpty(item.get(ATTR_AVATAR)).map(AttributeValue::s)), MAX_CONCURRENCY)
.collectList()
.doOnSuccess(ignored -> sample.stop(Metrics.timer(DELETE_PROFILES_TIMER_NAME, "outcome", "success")))
.doOnError(ignored -> sample.stop(Metrics.timer(DELETE_PROFILES_TIMER_NAME, "outcome", "error")))
.toFuture();

View File

@@ -7,17 +7,22 @@ package org.whispersystems.textsecuregcm.storage;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import io.lettuce.core.RedisException;
import java.io.IOException;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.textsecuregcm.util.Util;
import javax.annotation.Nullable;
import reactor.core.publisher.Mono;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
public class ProfilesManager {
@@ -27,13 +32,19 @@ public class ProfilesManager {
private final Profiles profiles;
private final FaultTolerantRedisClusterClient cacheCluster;
private final S3AsyncClient s3Client;
private final String bucket;
private final ObjectMapper mapper;
private static final CompletableFuture<?>[] EMPTY_FUTURE_ARRAY = new CompletableFuture[0];
public ProfilesManager(final Profiles profiles,
final FaultTolerantRedisClusterClient cacheCluster) {
public ProfilesManager(final Profiles profiles, final FaultTolerantRedisClusterClient cacheCluster, final S3AsyncClient s3Client,
final String bucket) {
this.profiles = profiles;
this.cacheCluster = cacheCluster;
this.s3Client = s3Client;
this.bucket = bucket;
this.mapper = SystemMapper.jsonMapper();
}
@@ -48,7 +59,21 @@ public class ProfilesManager {
}
public CompletableFuture<Void> deleteAll(UUID uuid) {
return CompletableFuture.allOf(redisDelete(uuid), profiles.deleteAll(uuid));
final CompletableFuture<Void> profilesAndAvatars = Mono.fromFuture(profiles.deleteAll(uuid))
.flatMapIterable(Function.identity())
.flatMap(avatar ->
Mono.fromFuture(s3Client.deleteObject(DeleteObjectRequest.builder()
.bucket(bucket)
.key(avatar)
.build()))
// this is best-effort
.retry(3)
.onErrorComplete()
.then()
).then().toFuture();
return CompletableFuture.allOf(redisDelete(uuid), profilesAndAvatars);
}
public Optional<VersionedProfile> get(UUID uuid, String version) {
@@ -137,7 +162,8 @@ public class ProfilesManager {
.thenRun(Util.NOOP);
}
private String getCacheKey(UUID uuid) {
@VisibleForTesting
static String getCacheKey(UUID uuid) {
return CACHE_PREFIX + uuid.toString();
}
}

View File

@@ -7,6 +7,7 @@ package org.whispersystems.textsecuregcm.storage;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import javax.annotation.Nullable;
import org.whispersystems.textsecuregcm.util.ByteArrayAdapter;
import org.whispersystems.textsecuregcm.util.ByteArrayBase64WithPaddingAdapter;
@@ -15,6 +16,7 @@ public record VersionedProfile (String version,
@JsonDeserialize(using = ByteArrayBase64WithPaddingAdapter.Deserializing.class)
byte[] name,
@Nullable
String avatar,
@JsonSerialize(using = ByteArrayBase64WithPaddingAdapter.Serializing.class)

View File

@@ -68,8 +68,10 @@ import org.whispersystems.textsecuregcm.util.ManagedAwsCrt;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.s3.S3AsyncClient;
/**
* Construct utilities commonly used by worker commands
@@ -175,6 +177,13 @@ record CommandDependencies(
DynamoDbClient dynamoDbClient = configuration.getDynamoDbClientConfiguration()
.buildSyncClient(awsCredentialsProvider, new MicrometerAwsSdkMetricPublisher(awsSdkMetricsExecutor, "dynamoDbSyncCommand"));
final AwsCredentialsProvider cdnCredentialsProvider = configuration.getCdnConfiguration().credentials().build();
final S3AsyncClient asyncCdnS3Client = S3AsyncClient.builder()
.credentialsProvider(cdnCredentialsProvider)
.region(Region.of(configuration.getCdnConfiguration().region()))
.build();
RegistrationRecoveryPasswords registrationRecoveryPasswords = new RegistrationRecoveryPasswords(
configuration.getDynamoDbTables().getRegistrationRecovery().getTableName(),
configuration.getDynamoDbTables().getRegistrationRecovery().getExpiration(),
@@ -222,7 +231,8 @@ record CommandDependencies(
DisconnectionRequestManager disconnectionRequestManager = new DisconnectionRequestManager(pubsubClient, disconnectionRequestListenerExecutor);
MessagesCache messagesCache = new MessagesCache(messagesCluster,
messageDeliveryScheduler, messageDeletionExecutor, Clock.systemUTC());
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster, asyncCdnS3Client,
configuration.getCdnConfiguration().bucket());
ReportMessageDynamoDb reportMessageDynamoDb = new ReportMessageDynamoDb(dynamoDbClient, dynamoDbAsyncClient,
configuration.getDynamoDbTables().getReportMessage().getTableName(),
configuration.getReportMessageConfiguration().getReportTtl());