mirror of
https://github.com/signalapp/Signal-Server
synced 2026-04-19 13:18:02 +01:00
Remove legacy delete-via-REST plumbing
This commit is contained in:
committed by
Jon Chambers
parent
7604306818
commit
389d44fd80
@@ -5,8 +5,8 @@
|
||||
|
||||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
|
||||
import static io.micrometer.core.instrument.Metrics.timer;
|
||||
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
@@ -30,9 +30,6 @@ import org.whispersystems.textsecuregcm.entities.MessageProtos;
|
||||
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
|
||||
import org.whispersystems.textsecuregcm.util.AttributeValues;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.scheduler.Scheduler;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
import software.amazon.awssdk.core.SdkBytes;
|
||||
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
||||
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
|
||||
@@ -51,9 +48,6 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
|
||||
@VisibleForTesting
|
||||
static final String KEY_SORT = "S";
|
||||
|
||||
@VisibleForTesting
|
||||
static final String LOCAL_INDEX_MESSAGE_UUID_NAME = "Message_UUID_Index";
|
||||
|
||||
@VisibleForTesting
|
||||
static final String LOCAL_INDEX_MESSAGE_UUID_KEY_SORT = "U";
|
||||
|
||||
@@ -69,7 +63,6 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
|
||||
private final String tableName;
|
||||
private final Duration timeToLive;
|
||||
private final ExecutorService messageDeletionExecutor;
|
||||
private final Scheduler messageDeletionScheduler;
|
||||
private final ExperimentEnrollmentManager experimentEnrollmentManager;
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(MessagesDynamoDb.class);
|
||||
@@ -84,7 +77,6 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
|
||||
this.timeToLive = timeToLive;
|
||||
|
||||
this.messageDeletionExecutor = messageDeletionExecutor;
|
||||
this.messageDeletionScheduler = Schedulers.fromExecutor(messageDeletionExecutor);
|
||||
this.experimentEnrollmentManager = experimentEnrollmentManager;
|
||||
}
|
||||
|
||||
@@ -164,48 +156,6 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
|
||||
.filter(Predicate.not(Objects::isNull));
|
||||
}
|
||||
|
||||
public CompletableFuture<Optional<MessageProtos.Envelope>> deleteMessageByDestinationAndGuid(
|
||||
final UUID destinationAccountUuid, final Device destinationDevice, final UUID messageUuid) {
|
||||
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, destinationDevice);
|
||||
final QueryRequest queryRequest = QueryRequest.builder()
|
||||
.tableName(tableName)
|
||||
.indexName(LOCAL_INDEX_MESSAGE_UUID_NAME)
|
||||
.projectionExpression(KEY_SORT)
|
||||
.consistentRead(true)
|
||||
.keyConditionExpression("#part = :part AND #uuid = :uuid")
|
||||
.expressionAttributeNames(Map.of(
|
||||
"#part", KEY_PARTITION,
|
||||
"#uuid", LOCAL_INDEX_MESSAGE_UUID_KEY_SORT))
|
||||
.expressionAttributeValues(Map.of(
|
||||
":part", partitionKey,
|
||||
":uuid", convertLocalIndexMessageUuidSortKey(messageUuid)))
|
||||
.build();
|
||||
|
||||
// because we are filtering on message UUID, this query should return at most one item,
|
||||
// but it’s simpler to handle the full stream and return the “last” item
|
||||
return Flux.from(dbAsyncClient.queryPaginator(queryRequest).items())
|
||||
.flatMap(item -> Mono.fromCompletionStage(dbAsyncClient.deleteItem(DeleteItemRequest.builder()
|
||||
.tableName(tableName)
|
||||
.key(Map.of(KEY_PARTITION, partitionKey, KEY_SORT,
|
||||
AttributeValues.fromByteArray(item.get(KEY_SORT).b().asByteArray())))
|
||||
.returnValues(ReturnValue.ALL_OLD)
|
||||
.build())))
|
||||
.mapNotNull(deleteItemResponse -> {
|
||||
try {
|
||||
if (deleteItemResponse.attributes() != null && deleteItemResponse.attributes().containsKey(KEY_PARTITION)) {
|
||||
return convertItemToEnvelope(deleteItemResponse.attributes(), experimentEnrollmentManager);
|
||||
}
|
||||
} catch (final InvalidProtocolBufferException e) {
|
||||
logger.error("Failed to parse envelope", e);
|
||||
}
|
||||
return null;
|
||||
})
|
||||
.map(Optional::ofNullable)
|
||||
.subscribeOn(messageDeletionScheduler)
|
||||
.last(Optional.empty()) // if the flux is empty, last() will throw without a default
|
||||
.toFuture();
|
||||
}
|
||||
|
||||
public CompletableFuture<Optional<MessageProtos.Envelope>> deleteMessage(final UUID destinationAccountUuid,
|
||||
final Device destinationDevice, final UUID messageUuid, final long serverTimestamp) {
|
||||
DeleteItemRequest.Builder deleteItemRequest = DeleteItemRequest.builder()
|
||||
|
||||
@@ -28,7 +28,6 @@ import org.reactivestreams.Publisher;
|
||||
import org.signal.libsignal.protocol.SealedSenderMultiRecipientMessage;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.entities.MessageProtos;
|
||||
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
|
||||
import org.whispersystems.textsecuregcm.identity.IdentityType;
|
||||
import org.whispersystems.textsecuregcm.identity.ServiceIdentifier;
|
||||
|
||||
Reference in New Issue
Block a user