mirror of
https://github.com/signalapp/Signal-Server
synced 2026-04-21 04:08:06 +01:00
Remove signed pre-keys transactionally when removing devices
This commit is contained in:
@@ -13,7 +13,6 @@ import java.util.concurrent.CompletionException;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Supplier;
|
||||
import org.whispersystems.textsecuregcm.util.Util;
|
||||
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
|
||||
|
||||
public class AccountLockManager {
|
||||
@@ -83,8 +82,9 @@ public class AccountLockManager {
|
||||
*
|
||||
* @return a future that completes normally when the given task has executed successfully and all locks have been
|
||||
* released; the returned future may fail with an {@link InterruptedException} if interrupted while acquiring a lock
|
||||
*/ public CompletableFuture<Void> withLockAsync(final List<String> e164s,
|
||||
final Supplier<CompletableFuture<?>> taskSupplier,
|
||||
*/
|
||||
public <T> CompletableFuture<T> withLockAsync(final List<String> e164s,
|
||||
final Supplier<CompletableFuture<T>> taskSupplier,
|
||||
final Executor executor) {
|
||||
|
||||
if (e164s.isEmpty()) {
|
||||
@@ -107,7 +107,6 @@ public class AccountLockManager {
|
||||
.thenCompose(ignored -> taskSupplier.get())
|
||||
.whenCompleteAsync((ignored, throwable) -> lockItems.forEach(lockItem -> lockClient.releaseLock(ReleaseLockOptions.builder(lockItem)
|
||||
.withBestEffort(true)
|
||||
.build())), executor)
|
||||
.thenRun(Util.NOOP);
|
||||
.build())), executor);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1035,7 +1035,7 @@ public class Accounts extends AbstractDynamoDbStore {
|
||||
return Optional.ofNullable(response.items().get(0).get(DELETED_ACCOUNTS_KEY_ACCOUNT_E164).s());
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> delete(final UUID uuid) {
|
||||
public CompletableFuture<Void> delete(final UUID uuid, final List<TransactWriteItem> additionalWriteItems) {
|
||||
final Timer.Sample sample = Timer.start();
|
||||
|
||||
return getByAccountIdentifierAsync(uuid)
|
||||
@@ -1050,6 +1050,8 @@ public class Accounts extends AbstractDynamoDbStore {
|
||||
account.getUsernameHash().ifPresent(usernameHash -> transactWriteItems.add(
|
||||
buildDelete(usernamesConstraintTableName, ATTR_USERNAME_HASH, usernameHash)));
|
||||
|
||||
transactWriteItems.addAll(additionalWriteItems);
|
||||
|
||||
return asyncClient.transactWriteItems(TransactWriteItemsRequest.builder()
|
||||
.transactItems(transactWriteItems)
|
||||
.build())
|
||||
|
||||
@@ -43,6 +43,7 @@ import java.util.function.BiFunction;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Stream;
|
||||
import javax.annotation.Nullable;
|
||||
import org.apache.commons.lang3.ObjectUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
@@ -204,7 +205,7 @@ public class AccountsManager {
|
||||
String accountCreationType = maybeRecentlyDeletedAccountIdentifier.isPresent() ? "recently-deleted" : "new";
|
||||
|
||||
try {
|
||||
accounts.create(account, keysManager.buildWriteItemsForRepeatedUseKeys(account.getIdentifier(IdentityType.ACI),
|
||||
accounts.create(account, keysManager.buildWriteItemsForNewDevice(account.getIdentifier(IdentityType.ACI),
|
||||
account.getIdentifier(IdentityType.PNI),
|
||||
Device.PRIMARY_ID,
|
||||
primaryDeviceSpec.aciSignedPreKey(),
|
||||
@@ -220,21 +221,31 @@ public class AccountsManager {
|
||||
account.setUuid(aci);
|
||||
account.setNumber(e.getExistingAccount().getNumber(), pni);
|
||||
|
||||
CompletableFuture.allOf(
|
||||
keysManager.delete(aci),
|
||||
keysManager.delete(pni),
|
||||
messagesManager.clear(aci),
|
||||
profilesManager.deleteAll(aci))
|
||||
.thenRunAsync(() -> clientPresenceManager.disconnectAllPresencesForUuid(aci), clientPresenceExecutor)
|
||||
.thenCompose(ignored -> accounts.reclaimAccount(e.getExistingAccount(),
|
||||
account,
|
||||
keysManager.buildWriteItemsForRepeatedUseKeys(account.getIdentifier(IdentityType.ACI),
|
||||
final List<TransactWriteItem> additionalWriteItems = Stream.concat(
|
||||
keysManager.buildWriteItemsForNewDevice(account.getIdentifier(IdentityType.ACI),
|
||||
account.getIdentifier(IdentityType.PNI),
|
||||
Device.PRIMARY_ID,
|
||||
primaryDeviceSpec.aciSignedPreKey(),
|
||||
primaryDeviceSpec.pniSignedPreKey(),
|
||||
primaryDeviceSpec.aciPqLastResortPreKey(),
|
||||
primaryDeviceSpec.pniPqLastResortPreKey())))
|
||||
primaryDeviceSpec.pniPqLastResortPreKey()).stream(),
|
||||
e.getExistingAccount().getDevices()
|
||||
.stream()
|
||||
.map(Device::getId)
|
||||
// No need to clear the keys for the primary device since we'll just overwrite them in the same
|
||||
// transaction anyhow
|
||||
.filter(existingDeviceId -> existingDeviceId != Device.PRIMARY_ID)
|
||||
.flatMap(existingDeviceId ->
|
||||
keysManager.buildWriteItemsForRemovedDevice(aci, pni, existingDeviceId).stream()))
|
||||
.toList();
|
||||
|
||||
CompletableFuture.allOf(
|
||||
keysManager.deleteSingleUsePreKeys(aci),
|
||||
keysManager.deleteSingleUsePreKeys(pni),
|
||||
messagesManager.clear(aci),
|
||||
profilesManager.deleteAll(aci))
|
||||
.thenRunAsync(() -> clientPresenceManager.disconnectAllPresencesForUuid(aci), clientPresenceExecutor)
|
||||
.thenCompose(ignored -> accounts.reclaimAccount(e.getExistingAccount(), account, additionalWriteItems))
|
||||
.thenCompose(ignored -> {
|
||||
// We should have cleared all messages before overwriting the old account, but more may have arrived
|
||||
// while we were working. Similarly, the old account holder could have added keys or profiles. We'll
|
||||
@@ -243,8 +254,8 @@ public class AccountsManager {
|
||||
//
|
||||
// We exclude the primary device's repeated-use keys from deletion because new keys were provided as
|
||||
// part of the account creation process, and we don't want to delete the keys that just got added.
|
||||
return CompletableFuture.allOf(keysManager.delete(aci, true),
|
||||
keysManager.delete(pni, true),
|
||||
return CompletableFuture.allOf(keysManager.deleteSingleUsePreKeys(aci),
|
||||
keysManager.deleteSingleUsePreKeys(pni),
|
||||
messagesManager.clear(aci),
|
||||
profilesManager.deleteAll(aci));
|
||||
})
|
||||
@@ -264,7 +275,9 @@ public class AccountsManager {
|
||||
}
|
||||
|
||||
public CompletableFuture<Pair<Account, Device>> addDevice(final Account account, final DeviceSpec deviceSpec) {
|
||||
return addDevice(account.getIdentifier(IdentityType.ACI), deviceSpec, MAX_UPDATE_ATTEMPTS);
|
||||
return accountLockManager.withLockAsync(List.of(account.getNumber()),
|
||||
() -> addDevice(account.getIdentifier(IdentityType.ACI), deviceSpec, MAX_UPDATE_ATTEMPTS),
|
||||
accountLockExecutor);
|
||||
}
|
||||
|
||||
private CompletableFuture<Pair<Account, Device>> addDevice(final UUID accountIdentifier, final DeviceSpec deviceSpec, final int retries) {
|
||||
@@ -274,7 +287,7 @@ public class AccountsManager {
|
||||
final byte nextDeviceId = account.getNextDeviceId();
|
||||
account.addDevice(deviceSpec.toDevice(nextDeviceId, clock));
|
||||
|
||||
final List<TransactWriteItem> additionalWriteItems = keysManager.buildWriteItemsForRepeatedUseKeys(
|
||||
final List<TransactWriteItem> additionalWriteItems = keysManager.buildWriteItemsForNewDevice(
|
||||
account.getIdentifier(IdentityType.ACI),
|
||||
account.getIdentifier(IdentityType.PNI),
|
||||
nextDeviceId,
|
||||
@@ -284,8 +297,8 @@ public class AccountsManager {
|
||||
deviceSpec.pniPqLastResortPreKey());
|
||||
|
||||
return CompletableFuture.allOf(
|
||||
keysManager.delete(account.getUuid(), nextDeviceId),
|
||||
keysManager.delete(account.getPhoneNumberIdentifier(), nextDeviceId),
|
||||
keysManager.deleteSingleUsePreKeys(account.getUuid(), nextDeviceId),
|
||||
keysManager.deleteSingleUsePreKeys(account.getPhoneNumberIdentifier(), nextDeviceId),
|
||||
messagesManager.clear(account.getUuid(), nextDeviceId))
|
||||
.thenCompose(ignored -> accounts.updateTransactionallyAsync(account, additionalWriteItems))
|
||||
.thenApply(ignored -> new Pair<>(account, account.getDevice(nextDeviceId).orElseThrow()));
|
||||
@@ -306,16 +319,43 @@ public class AccountsManager {
|
||||
throw new IllegalArgumentException("Cannot remove primary device");
|
||||
}
|
||||
|
||||
return CompletableFuture.allOf(
|
||||
keysManager.delete(account.getUuid(), deviceId),
|
||||
return accountLockManager.withLockAsync(List.of(account.getNumber()),
|
||||
() -> removeDevice(account.getIdentifier(IdentityType.ACI), deviceId, MAX_UPDATE_ATTEMPTS),
|
||||
accountLockExecutor);
|
||||
}
|
||||
|
||||
private CompletableFuture<Account> removeDevice(final UUID accountIdentifier, final byte deviceId, final int retries) {
|
||||
return accounts.getByAccountIdentifierAsync(accountIdentifier)
|
||||
.thenApply(maybeAccount -> maybeAccount.orElseThrow(ContestedOptimisticLockException::new))
|
||||
.thenCompose(account -> CompletableFuture.allOf(
|
||||
keysManager.deleteSingleUsePreKeys(account.getUuid(), deviceId),
|
||||
messagesManager.clear(account.getUuid(), deviceId))
|
||||
.thenCompose(ignored -> updateAsync(account, (Consumer<Account>) a -> a.removeDevice(deviceId)))
|
||||
// ensure any messages that came in after the first clear() are also removed
|
||||
.thenCompose(updatedAccount -> messagesManager.clear(account.getUuid(), deviceId)
|
||||
.thenApply(ignored -> updatedAccount))
|
||||
.thenApply(ignored -> account))
|
||||
.thenCompose(account -> {
|
||||
account.removeDevice(deviceId);
|
||||
|
||||
return accounts.updateTransactionallyAsync(account, keysManager.buildWriteItemsForRemovedDevice(
|
||||
account.getIdentifier(IdentityType.ACI),
|
||||
account.getIdentifier(IdentityType.PNI),
|
||||
deviceId))
|
||||
.thenApply(ignored -> account);
|
||||
})
|
||||
.thenCompose(updatedAccount -> redisDeleteAsync(updatedAccount).thenApply(ignored -> updatedAccount))
|
||||
// Ensure any messages/single-use pre-keys that came in while we were working are also removed
|
||||
.thenCompose(account -> CompletableFuture.allOf(
|
||||
keysManager.deleteSingleUsePreKeys(account.getUuid(), deviceId),
|
||||
messagesManager.clear(account.getUuid(), deviceId))
|
||||
.thenApply(ignored -> account))
|
||||
.exceptionallyCompose(throwable -> {
|
||||
if (ExceptionUtils.unwrap(throwable) instanceof ContestedOptimisticLockException && retries > 0) {
|
||||
return removeDevice(accountIdentifier, deviceId, retries - 1);
|
||||
}
|
||||
|
||||
return CompletableFuture.failedFuture(throwable);
|
||||
})
|
||||
.whenComplete((ignored, throwable) -> {
|
||||
if (throwable == null) {
|
||||
clientPresenceManager.disconnectPresence(account.getUuid(), deviceId);
|
||||
clientPresenceManager.disconnectPresence(accountIdentifier, deviceId);
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -370,12 +410,12 @@ public class AccountsManager {
|
||||
final UUID phoneNumberIdentifier = phoneNumberIdentifiers.getPhoneNumberIdentifier(targetNumber);
|
||||
|
||||
CompletableFuture.allOf(
|
||||
keysManager.delete(phoneNumberIdentifier),
|
||||
keysManager.delete(originalPhoneNumberIdentifier))
|
||||
keysManager.deleteSingleUsePreKeys(phoneNumberIdentifier),
|
||||
keysManager.deleteSingleUsePreKeys(originalPhoneNumberIdentifier))
|
||||
.join();
|
||||
|
||||
final Collection<TransactWriteItem> keyWriteItems =
|
||||
buildKeyWriteItems(uuid, phoneNumberIdentifier, pniSignedPreKeys, pniPqLastResortPreKeys);
|
||||
buildPniKeyWriteItems(uuid, phoneNumberIdentifier, pniSignedPreKeys, pniPqLastResortPreKeys);
|
||||
|
||||
final Account numberChangedAccount = updateWithRetries(
|
||||
account,
|
||||
@@ -404,10 +444,10 @@ public class AccountsManager {
|
||||
final UUID pni = account.getIdentifier(IdentityType.PNI);
|
||||
|
||||
final Collection<TransactWriteItem> keyWriteItems =
|
||||
buildKeyWriteItems(pni, pni, pniSignedPreKeys, pniPqLastResortPreKeys);
|
||||
buildPniKeyWriteItems(pni, pni, pniSignedPreKeys, pniPqLastResortPreKeys);
|
||||
|
||||
return redisDeleteAsync(account)
|
||||
.thenCompose(ignored -> keysManager.delete(pni))
|
||||
.thenCompose(ignored -> keysManager.deleteSingleUsePreKeys(pni))
|
||||
.thenCompose(ignored -> updateTransactionallyWithRetriesAsync(account,
|
||||
a -> setPniKeys(a, pniIdentityKey, pniSignedPreKeys, pniRegistrationIds),
|
||||
accounts::updateTransactionallyAsync,
|
||||
@@ -418,7 +458,7 @@ public class AccountsManager {
|
||||
.join();
|
||||
}
|
||||
|
||||
private Collection<TransactWriteItem> buildKeyWriteItems(
|
||||
private Collection<TransactWriteItem> buildPniKeyWriteItems(
|
||||
final UUID enabledDevicesIdentifier,
|
||||
final UUID phoneNumberIdentifier,
|
||||
@Nullable final Map<Byte, ECSignedPreKey> pniSignedPreKeys,
|
||||
@@ -961,16 +1001,23 @@ public class AccountsManager {
|
||||
}
|
||||
|
||||
private CompletableFuture<Void> delete(final Account account) {
|
||||
final List<TransactWriteItem> additionalWriteItems =
|
||||
account.getDevices().stream().flatMap(device -> keysManager.buildWriteItemsForRemovedDevice(
|
||||
account.getIdentifier(IdentityType.ACI),
|
||||
account.getIdentifier(IdentityType.PNI),
|
||||
device.getId()).stream())
|
||||
.toList();
|
||||
|
||||
return CompletableFuture.allOf(
|
||||
secureStorageClient.deleteStoredData(account.getUuid()),
|
||||
secureValueRecovery2Client.deleteBackups(account.getUuid()),
|
||||
keysManager.delete(account.getUuid()),
|
||||
keysManager.delete(account.getPhoneNumberIdentifier()),
|
||||
keysManager.deleteSingleUsePreKeys(account.getUuid()),
|
||||
keysManager.deleteSingleUsePreKeys(account.getPhoneNumberIdentifier()),
|
||||
messagesManager.clear(account.getUuid()),
|
||||
messagesManager.clear(account.getPhoneNumberIdentifier()),
|
||||
profilesManager.deleteAll(account.getUuid()),
|
||||
registrationRecoveryPasswordsManager.removeForNumber(account.getNumber()))
|
||||
.thenCompose(ignored -> CompletableFuture.allOf(accounts.delete(account.getUuid()), redisDeleteAsync(account)))
|
||||
.thenCompose(ignored -> CompletableFuture.allOf(accounts.delete(account.getUuid(), additionalWriteItems), redisDeleteAsync(account)))
|
||||
.thenRun(() -> RedisOperation.unchecked(() ->
|
||||
account.getDevices().forEach(device ->
|
||||
clientPresenceManager.disconnectPresence(account.getUuid(), device.getId()))));
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
@@ -46,7 +47,7 @@ public class KeysManager {
|
||||
final ECSignedPreKey ecSignedPreKey) {
|
||||
|
||||
return dynamicConfigurationManager.getConfiguration().getEcPreKeyMigrationConfiguration().storeEcSignedPreKeys()
|
||||
? Optional.of(ecSignedPreKeys.buildTransactWriteItem(identifier, deviceId, ecSignedPreKey))
|
||||
? Optional.of(ecSignedPreKeys.buildTransactWriteItemForInsertion(identifier, deviceId, ecSignedPreKey))
|
||||
: Optional.empty();
|
||||
}
|
||||
|
||||
@@ -54,10 +55,10 @@ public class KeysManager {
|
||||
final byte deviceId,
|
||||
final KEMSignedPreKey lastResortSignedPreKey) {
|
||||
|
||||
return pqLastResortKeys.buildTransactWriteItem(identifier, deviceId, lastResortSignedPreKey);
|
||||
return pqLastResortKeys.buildTransactWriteItemForInsertion(identifier, deviceId, lastResortSignedPreKey);
|
||||
}
|
||||
|
||||
public List<TransactWriteItem> buildWriteItemsForRepeatedUseKeys(final UUID accountIdentifier,
|
||||
public List<TransactWriteItem> buildWriteItemsForNewDevice(final UUID accountIdentifier,
|
||||
final UUID phoneNumberIdentifier,
|
||||
final byte deviceId,
|
||||
final ECSignedPreKey aciSignedPreKey,
|
||||
@@ -65,10 +66,38 @@ public class KeysManager {
|
||||
final KEMSignedPreKey aciPqLastResortPreKey,
|
||||
final KEMSignedPreKey pniLastResortPreKey) {
|
||||
|
||||
return List.of(ecSignedPreKeys.buildTransactWriteItem(accountIdentifier, deviceId, aciSignedPreKey),
|
||||
ecSignedPreKeys.buildTransactWriteItem(phoneNumberIdentifier, deviceId, pniSignedPreKey),
|
||||
pqLastResortKeys.buildTransactWriteItem(accountIdentifier, deviceId, aciPqLastResortPreKey),
|
||||
pqLastResortKeys.buildTransactWriteItem(phoneNumberIdentifier, deviceId, pniLastResortPreKey));
|
||||
final List<TransactWriteItem> writeItems = new ArrayList<>(List.of(
|
||||
pqLastResortKeys.buildTransactWriteItemForInsertion(accountIdentifier, deviceId, aciPqLastResortPreKey),
|
||||
pqLastResortKeys.buildTransactWriteItemForInsertion(phoneNumberIdentifier, deviceId, pniLastResortPreKey)
|
||||
));
|
||||
|
||||
if (dynamicConfigurationManager.getConfiguration().getEcPreKeyMigrationConfiguration().storeEcSignedPreKeys()) {
|
||||
writeItems.addAll(List.of(
|
||||
ecSignedPreKeys.buildTransactWriteItemForInsertion(accountIdentifier, deviceId, aciSignedPreKey),
|
||||
ecSignedPreKeys.buildTransactWriteItemForInsertion(phoneNumberIdentifier, deviceId, pniSignedPreKey)
|
||||
));
|
||||
}
|
||||
|
||||
return writeItems;
|
||||
}
|
||||
|
||||
public List<TransactWriteItem> buildWriteItemsForRemovedDevice(final UUID accountIdentifier,
|
||||
final UUID phoneNumberIdentifier,
|
||||
final byte deviceId) {
|
||||
|
||||
final List<TransactWriteItem> writeItems = new ArrayList<>(List.of(
|
||||
pqLastResortKeys.buildTransactWriteItemForDeletion(accountIdentifier, deviceId),
|
||||
pqLastResortKeys.buildTransactWriteItemForDeletion(phoneNumberIdentifier, deviceId)
|
||||
));
|
||||
|
||||
if (dynamicConfigurationManager.getConfiguration().getEcPreKeyMigrationConfiguration().deleteEcSignedPreKeys()) {
|
||||
writeItems.addAll(List.of(
|
||||
ecSignedPreKeys.buildTransactWriteItemForDeletion(accountIdentifier, deviceId),
|
||||
ecSignedPreKeys.buildTransactWriteItemForDeletion(phoneNumberIdentifier, deviceId)
|
||||
));
|
||||
}
|
||||
|
||||
return writeItems;
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> storeEcSignedPreKeys(final UUID identifier, final Map<Byte, ECSignedPreKey> keys) {
|
||||
@@ -130,27 +159,17 @@ public class KeysManager {
|
||||
return pqPreKeys.getCount(identifier, deviceId);
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> delete(final UUID identifier) {
|
||||
return delete(identifier, false);
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> delete(final UUID identifier, final boolean excludePrimaryDevice) {
|
||||
public CompletableFuture<Void> deleteSingleUsePreKeys(final UUID identifier) {
|
||||
return CompletableFuture.allOf(
|
||||
ecPreKeys.delete(identifier),
|
||||
pqPreKeys.delete(identifier),
|
||||
dynamicConfigurationManager.getConfiguration().getEcPreKeyMigrationConfiguration().deleteEcSignedPreKeys()
|
||||
? ecSignedPreKeys.delete(identifier, excludePrimaryDevice)
|
||||
: CompletableFuture.completedFuture(null),
|
||||
pqLastResortKeys.delete(identifier, excludePrimaryDevice));
|
||||
pqPreKeys.delete(identifier)
|
||||
);
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> delete(final UUID accountUuid, final byte deviceId) {
|
||||
public CompletableFuture<Void> deleteSingleUsePreKeys(final UUID accountUuid, final byte deviceId) {
|
||||
return CompletableFuture.allOf(
|
||||
ecPreKeys.delete(accountUuid, deviceId),
|
||||
pqPreKeys.delete(accountUuid, deviceId),
|
||||
dynamicConfigurationManager.getConfiguration().getEcPreKeyMigrationConfiguration().deleteEcSignedPreKeys()
|
||||
? ecSignedPreKeys.delete(accountUuid, deviceId)
|
||||
: CompletableFuture.completedFuture(null),
|
||||
pqLastResortKeys.delete(accountUuid, deviceId));
|
||||
pqPreKeys.delete(accountUuid, deviceId)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -266,7 +266,7 @@ public class MessagePersister implements Managed {
|
||||
clientPresenceManager.disconnectPresence(account.getUuid(), deviceToDelete.getId());
|
||||
CompletableFuture
|
||||
.allOf(
|
||||
keysManager.delete(account.getUuid(), deviceToDelete.getId()),
|
||||
keysManager.deleteSingleUsePreKeys(account.getUuid(), deviceToDelete.getId()),
|
||||
messagesManager.clear(account.getUuid(), deviceToDelete.getId()))
|
||||
.orTimeout((UNLINK_TIMEOUT.toSeconds() * 3) / 4, TimeUnit.SECONDS)
|
||||
.join();
|
||||
|
||||
@@ -15,10 +15,9 @@ import org.whispersystems.textsecuregcm.entities.SignedPreKey;
|
||||
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
|
||||
import org.whispersystems.textsecuregcm.util.AttributeValues;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
||||
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
|
||||
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.Delete;
|
||||
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.Put;
|
||||
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
|
||||
@@ -47,8 +46,6 @@ public abstract class RepeatedUseSignedPreKeyStore<K extends SignedPreKey<?>> {
|
||||
|
||||
private final Timer storeSingleKeyTimer = Metrics.timer(MetricsUtil.name(getClass(), "storeSingleKey"));
|
||||
private final Timer storeKeyBatchTimer = Metrics.timer(MetricsUtil.name(getClass(), "storeKeyBatch"));
|
||||
private final Timer deleteForDeviceTimer = Metrics.timer(MetricsUtil.name(getClass(), "deleteForDevice"));
|
||||
private final Timer deleteForAccountTimer = Metrics.timer(MetricsUtil.name(getClass(), "deleteForAccount"));
|
||||
|
||||
private final String findKeyTimerName = MetricsUtil.name(getClass(), "findKey");
|
||||
|
||||
@@ -112,7 +109,7 @@ public abstract class RepeatedUseSignedPreKeyStore<K extends SignedPreKey<?>> {
|
||||
.thenRun(() -> sample.stop(storeKeyBatchTimer));
|
||||
}
|
||||
|
||||
TransactWriteItem buildTransactWriteItem(final UUID identifier, final byte deviceId, final K preKey) {
|
||||
TransactWriteItem buildTransactWriteItemForInsertion(final UUID identifier, final byte deviceId, final K preKey) {
|
||||
return TransactWriteItem.builder()
|
||||
.put(Put.builder()
|
||||
.tableName(tableName)
|
||||
@@ -121,6 +118,15 @@ public abstract class RepeatedUseSignedPreKeyStore<K extends SignedPreKey<?>> {
|
||||
.build();
|
||||
}
|
||||
|
||||
public TransactWriteItem buildTransactWriteItemForDeletion(final UUID identifier, final byte deviceId) {
|
||||
return TransactWriteItem.builder()
|
||||
.delete(Delete.builder()
|
||||
.tableName(tableName)
|
||||
.key(getPrimaryKey(identifier, deviceId))
|
||||
.build())
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Finds a repeated-use pre-key for a specific device.
|
||||
*
|
||||
@@ -147,52 +153,6 @@ public abstract class RepeatedUseSignedPreKeyStore<K extends SignedPreKey<?>> {
|
||||
return findFuture;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clears all repeated-use pre-keys associated with the given account/identity.
|
||||
*
|
||||
* @param identifier the identifier for the account/identity for which to clear repeated-use pre-keys
|
||||
* @param excludePrimaryDevice whether to exclude the primary device from repeated-use key deletion; this is intended
|
||||
* for cases when a user "re-registers" and displaces an existing account record and has
|
||||
* provided new repeated-use keys for the primary device in the process of creating the
|
||||
* new account
|
||||
*
|
||||
* @return a future that completes once repeated-use pre-keys have been cleared from all devices associated with the
|
||||
* target account/identity
|
||||
*/
|
||||
public CompletableFuture<Void> delete(final UUID identifier, final boolean excludePrimaryDevice) {
|
||||
final Timer.Sample sample = Timer.start();
|
||||
|
||||
return getDeviceIdsWithKeys(identifier)
|
||||
.filter(deviceId -> deviceId != Device.PRIMARY_ID || !excludePrimaryDevice)
|
||||
.map(deviceId -> DeleteItemRequest.builder()
|
||||
.tableName(tableName)
|
||||
.key(getPrimaryKey(identifier, deviceId))
|
||||
.build())
|
||||
.flatMap(deleteItemRequest -> Mono.fromFuture(() -> dynamoDbAsyncClient.deleteItem(deleteItemRequest)))
|
||||
// Idiom: wait for everything to finish, but discard the results
|
||||
.reduce(0, (a, b) -> 0)
|
||||
.toFuture()
|
||||
.thenRun(() -> sample.stop(deleteForAccountTimer));
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes the repeated-use pre-key associated with a specific device.
|
||||
*
|
||||
* @param identifier the identifier for the account/identity with which the target device is associated
|
||||
* @param deviceId the identifier for the device within the given account/identity
|
||||
*
|
||||
* @return a future that completes once the repeated-use pre-key has been removed from the target device
|
||||
*/
|
||||
public CompletableFuture<Void> delete(final UUID identifier, final byte deviceId) {
|
||||
final Timer.Sample sample = Timer.start();
|
||||
|
||||
return dynamoDbAsyncClient.deleteItem(DeleteItemRequest.builder()
|
||||
.tableName(tableName)
|
||||
.key(getPrimaryKey(identifier, deviceId))
|
||||
.build())
|
||||
.thenRun(() -> sample.stop(deleteForDeviceTimer));
|
||||
}
|
||||
|
||||
public Flux<Byte> getDeviceIdsWithKeys(final UUID identifier) {
|
||||
return Flux.from(dynamoDbAsyncClient.queryPaginator(QueryRequest.builder()
|
||||
.tableName(tableName)
|
||||
|
||||
Reference in New Issue
Block a user