mirror of
https://github.com/signalapp/Signal-Server
synced 2026-04-20 15:48:05 +01:00
Consolidate avatar deletion logic in ProfilesManager
This commit is contained in:
@@ -872,7 +872,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||
.addService(ExternalServiceCredentialsGrpcService.createForAllExternalServices(config, rateLimiters))
|
||||
.addService(new KeysGrpcService(accountsManager, keysManager, rateLimiters))
|
||||
.addService(new ProfileGrpcService(clock, accountsManager, profilesManager, dynamicConfigurationManager,
|
||||
config.getBadges(), asyncCdnS3Client, profileCdnPolicyGenerator, profileCdnPolicySigner, profileBadgeConverter, rateLimiters, zkProfileOperations, config.getCdnConfiguration().bucket()));
|
||||
config.getBadges(), profileCdnPolicyGenerator, profileCdnPolicySigner, profileBadgeConverter, rateLimiters, zkProfileOperations));
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1100,8 +1100,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||
Clock.systemUTC()),
|
||||
new PaymentsController(currencyManager, paymentsCredentialsGenerator),
|
||||
new ProfileController(clock, rateLimiters, accountsManager, profilesManager, dynamicConfigurationManager,
|
||||
profileBadgeConverter, config.getBadges(), cdnS3Client, profileCdnPolicyGenerator, profileCdnPolicySigner,
|
||||
config.getCdnConfiguration().bucket(), zkSecretParams, zkProfileOperations, batchIdentityCheckExecutor),
|
||||
profileBadgeConverter, config.getBadges(), profileCdnPolicyGenerator, profileCdnPolicySigner,
|
||||
zkSecretParams, zkProfileOperations, batchIdentityCheckExecutor),
|
||||
new ProvisioningController(rateLimiters, provisioningManager),
|
||||
new RegistrationController(accountsManager, phoneVerificationTokenManager, registrationLockVerificationManager,
|
||||
rateLimiters),
|
||||
|
||||
@@ -96,8 +96,6 @@ import org.whispersystems.textsecuregcm.util.ProfileHelper;
|
||||
import org.whispersystems.textsecuregcm.util.Util;
|
||||
import org.whispersystems.websocket.auth.Mutable;
|
||||
import org.whispersystems.websocket.auth.ReadOnly;
|
||||
import software.amazon.awssdk.services.s3.S3Client;
|
||||
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
|
||||
|
||||
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
|
||||
@Path("/v1/profile")
|
||||
@@ -116,9 +114,6 @@ public class ProfileController {
|
||||
private final ServerSecretParams serverSecretParams;
|
||||
private final ServerZkProfileOperations zkProfileOperations;
|
||||
|
||||
private final S3Client s3client;
|
||||
private final String bucket;
|
||||
|
||||
private final Executor batchIdentityCheckExecutor;
|
||||
|
||||
private static final String EXPIRING_PROFILE_KEY_CREDENTIAL_TYPE = "expiringProfileKey";
|
||||
@@ -134,10 +129,8 @@ public class ProfileController {
|
||||
DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager,
|
||||
ProfileBadgeConverter profileBadgeConverter,
|
||||
BadgesConfiguration badgesConfiguration,
|
||||
S3Client s3client,
|
||||
PostPolicyGenerator policyGenerator,
|
||||
PolicySigner policySigner,
|
||||
String bucket,
|
||||
ServerSecretParams serverSecretParams,
|
||||
ServerZkProfileOperations zkProfileOperations,
|
||||
Executor batchIdentityCheckExecutor) {
|
||||
@@ -151,8 +144,6 @@ public class ProfileController {
|
||||
BadgeConfiguration::getId, Function.identity()));
|
||||
this.serverSecretParams = serverSecretParams;
|
||||
this.zkProfileOperations = zkProfileOperations;
|
||||
this.bucket = bucket;
|
||||
this.s3client = s3client;
|
||||
this.policyGenerator = policyGenerator;
|
||||
this.policySigner = policySigner;
|
||||
this.batchIdentityCheckExecutor = Preconditions.checkNotNull(batchIdentityCheckExecutor);
|
||||
@@ -200,10 +191,7 @@ public class ProfileController {
|
||||
request.commitment().serialize()));
|
||||
|
||||
if (request.getAvatarChange() != CreateProfileRequest.AvatarChange.UNCHANGED) {
|
||||
currentAvatar.ifPresent(s -> s3client.deleteObject(DeleteObjectRequest.builder()
|
||||
.bucket(bucket)
|
||||
.key(s)
|
||||
.build()));
|
||||
currentAvatar.ifPresent(s -> profilesManager.deleteAvatar(s).join());
|
||||
}
|
||||
|
||||
accountsManager.update(auth.getAccount(), a -> {
|
||||
|
||||
@@ -50,8 +50,6 @@ import org.whispersystems.textsecuregcm.util.Pair;
|
||||
import org.whispersystems.textsecuregcm.util.ProfileHelper;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import software.amazon.awssdk.services.s3.S3AsyncClient;
|
||||
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
|
||||
|
||||
public class ProfileGrpcService extends ReactorProfileGrpc.ProfileImplBase {
|
||||
|
||||
@@ -60,13 +58,11 @@ public class ProfileGrpcService extends ReactorProfileGrpc.ProfileImplBase {
|
||||
private final ProfilesManager profilesManager;
|
||||
private final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager;
|
||||
private final Map<String, BadgeConfiguration> badgeConfigurationMap;
|
||||
private final S3AsyncClient asyncS3client;
|
||||
private final PostPolicyGenerator policyGenerator;
|
||||
private final PolicySigner policySigner;
|
||||
private final ProfileBadgeConverter profileBadgeConverter;
|
||||
private final RateLimiters rateLimiters;
|
||||
private final ServerZkProfileOperations zkProfileOperations;
|
||||
private final String bucket;
|
||||
|
||||
private record AvatarData(Optional<String> currentAvatar,
|
||||
Optional<String> finalAvatar,
|
||||
@@ -78,26 +74,22 @@ public class ProfileGrpcService extends ReactorProfileGrpc.ProfileImplBase {
|
||||
final ProfilesManager profilesManager,
|
||||
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager,
|
||||
final BadgesConfiguration badgesConfiguration,
|
||||
final S3AsyncClient asyncS3client,
|
||||
final PostPolicyGenerator policyGenerator,
|
||||
final PolicySigner policySigner,
|
||||
final ProfileBadgeConverter profileBadgeConverter,
|
||||
final RateLimiters rateLimiters,
|
||||
final ServerZkProfileOperations zkProfileOperations,
|
||||
final String bucket) {
|
||||
final ServerZkProfileOperations zkProfileOperations) {
|
||||
this.clock = clock;
|
||||
this.accountsManager = accountsManager;
|
||||
this.profilesManager = profilesManager;
|
||||
this.dynamicConfigurationManager = dynamicConfigurationManager;
|
||||
this.badgeConfigurationMap = badgesConfiguration.getBadges().stream().collect(Collectors.toMap(
|
||||
BadgeConfiguration::getId, Function.identity()));
|
||||
this.asyncS3client = asyncS3client;
|
||||
this.policyGenerator = policyGenerator;
|
||||
this.policySigner = policySigner;
|
||||
this.profileBadgeConverter = profileBadgeConverter;
|
||||
this.rateLimiters = rateLimiters;
|
||||
this.zkProfileOperations = zkProfileOperations;
|
||||
this.bucket = bucket;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -157,10 +149,7 @@ public class ProfileGrpcService extends ReactorProfileGrpc.ProfileImplBase {
|
||||
})));
|
||||
|
||||
if (request.getAvatarChange() != AvatarChange.AVATAR_CHANGE_UNCHANGED && avatarData.currentAvatar().isPresent()) {
|
||||
updates.add(Mono.fromFuture(() -> asyncS3client.deleteObject(DeleteObjectRequest.builder()
|
||||
.bucket(bucket)
|
||||
.key(avatarData.currentAvatar().get())
|
||||
.build())));
|
||||
updates.add(Mono.fromFuture(() -> profilesManager.deleteAvatar(avatarData.currentAvatar.get())));
|
||||
}
|
||||
return profileSetMono.thenMany(Flux.merge(updates)).then(Mono.just(avatarData));
|
||||
})
|
||||
|
||||
@@ -5,6 +5,8 @@
|
||||
|
||||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
@@ -15,6 +17,7 @@ import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.function.Function;
|
||||
import javax.annotation.Nullable;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
|
||||
@@ -36,8 +39,7 @@ public class ProfilesManager {
|
||||
private final String bucket;
|
||||
private final ObjectMapper mapper;
|
||||
|
||||
private static final CompletableFuture<?>[] EMPTY_FUTURE_ARRAY = new CompletableFuture[0];
|
||||
|
||||
private static final String DELETE_AVATAR_COUNTER_NAME = name(ProfilesManager.class, "deleteAvatar");
|
||||
|
||||
public ProfilesManager(final Profiles profiles, final FaultTolerantRedisClusterClient cacheCluster, final S3AsyncClient s3Client,
|
||||
final String bucket) {
|
||||
@@ -63,19 +65,35 @@ public class ProfilesManager {
|
||||
final CompletableFuture<Void> profilesAndAvatars = Mono.fromFuture(profiles.deleteAll(uuid))
|
||||
.flatMapIterable(Function.identity())
|
||||
.flatMap(avatar ->
|
||||
Mono.fromFuture(s3Client.deleteObject(DeleteObjectRequest.builder()
|
||||
.bucket(bucket)
|
||||
.key(avatar)
|
||||
.build()))
|
||||
Mono.fromFuture(deleteAvatar(avatar))
|
||||
// this is best-effort
|
||||
.retry(3)
|
||||
.onErrorComplete()
|
||||
.then()
|
||||
).then().toFuture();
|
||||
.onErrorComplete())
|
||||
.then().toFuture();
|
||||
|
||||
return CompletableFuture.allOf(redisDelete(uuid), profilesAndAvatars);
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> deleteAvatar(String avatar) {
|
||||
return s3Client.deleteObject(DeleteObjectRequest.builder()
|
||||
.bucket(bucket)
|
||||
.key(avatar)
|
||||
.build())
|
||||
.handle((ignored, throwable) -> {
|
||||
final String outcome;
|
||||
if (throwable != null) {
|
||||
logger.warn("Error deleting avatar", throwable);
|
||||
outcome = "error";
|
||||
} else {
|
||||
outcome = "success";
|
||||
}
|
||||
|
||||
Metrics.counter(DELETE_AVATAR_COUNTER_NAME, "outcome", outcome).increment();
|
||||
return null;
|
||||
})
|
||||
.thenRun(Util.NOOP);
|
||||
}
|
||||
|
||||
public Optional<VersionedProfile> get(UUID uuid, String version) {
|
||||
Optional<VersionedProfile> profile = redisGet(uuid, version);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user