mirror of
https://github.com/signalapp/Signal-Server
synced 2026-04-21 05:58:00 +01:00
Make username-related operations asynchronous
This commit is contained in:
@@ -22,6 +22,7 @@ import javax.ws.rs.DELETE;
|
||||
import javax.ws.rs.GET;
|
||||
import javax.ws.rs.HEAD;
|
||||
import javax.ws.rs.HeaderParam;
|
||||
import javax.ws.rs.NotFoundException;
|
||||
import javax.ws.rs.PUT;
|
||||
import javax.ws.rs.Path;
|
||||
import javax.ws.rs.PathParam;
|
||||
@@ -61,6 +62,7 @@ import org.whispersystems.textsecuregcm.storage.Device;
|
||||
import org.whispersystems.textsecuregcm.storage.RegistrationRecoveryPasswordsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.UsernameHashNotAvailableException;
|
||||
import org.whispersystems.textsecuregcm.storage.UsernameReservationNotFoundException;
|
||||
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
|
||||
import org.whispersystems.textsecuregcm.util.HeaderUtils;
|
||||
import org.whispersystems.textsecuregcm.util.UsernameHashZkProofVerifier;
|
||||
import org.whispersystems.textsecuregcm.util.Util;
|
||||
@@ -271,9 +273,10 @@ public class AccountController {
|
||||
)
|
||||
@ApiResponse(responseCode = "204", description = "Username successfully deleted.", useReturnTypeSchema = true)
|
||||
@ApiResponse(responseCode = "401", description = "Account authentication check failed.")
|
||||
public void deleteUsernameHash(@Auth final AuthenticatedAccount auth) {
|
||||
public CompletableFuture<Void> deleteUsernameHash(@Auth final AuthenticatedAccount auth) {
|
||||
clearUsernameLink(auth.getAccount());
|
||||
accounts.clearUsernameHash(auth.getAccount());
|
||||
return accounts.clearUsernameHash(auth.getAccount())
|
||||
.thenRun(Util.NOOP);
|
||||
}
|
||||
|
||||
@PUT
|
||||
@@ -292,7 +295,7 @@ public class AccountController {
|
||||
@ApiResponse(responseCode = "409", description = "All username hashes from the list are taken.")
|
||||
@ApiResponse(responseCode = "422", description = "Invalid request format.")
|
||||
@ApiResponse(responseCode = "429", description = "Ratelimited.")
|
||||
public ReserveUsernameHashResponse reserveUsernameHash(
|
||||
public CompletableFuture<ReserveUsernameHashResponse> reserveUsernameHash(
|
||||
@Auth final AuthenticatedAccount auth,
|
||||
@NotNull @Valid final ReserveUsernameHashRequest usernameRequest) throws RateLimitExceededException {
|
||||
|
||||
@@ -304,15 +307,15 @@ public class AccountController {
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
final AccountsManager.UsernameReservation reservation = accounts.reserveUsernameHash(
|
||||
auth.getAccount(),
|
||||
usernameRequest.usernameHashes()
|
||||
);
|
||||
return new ReserveUsernameHashResponse(reservation.reservedUsernameHash());
|
||||
} catch (final UsernameHashNotAvailableException e) {
|
||||
throw new WebApplicationException(Status.CONFLICT);
|
||||
}
|
||||
return accounts.reserveUsernameHash(auth.getAccount(), usernameRequest.usernameHashes())
|
||||
.thenApply(reservation -> new ReserveUsernameHashResponse(reservation.reservedUsernameHash()))
|
||||
.exceptionally(throwable -> {
|
||||
if (ExceptionUtils.unwrap(throwable) instanceof UsernameHashNotAvailableException) {
|
||||
throw new WebApplicationException(Status.CONFLICT);
|
||||
}
|
||||
|
||||
throw ExceptionUtils.wrap(throwable);
|
||||
});
|
||||
}
|
||||
|
||||
@PUT
|
||||
@@ -332,10 +335,9 @@ public class AccountController {
|
||||
@ApiResponse(responseCode = "410", description = "Username hash not available (username can't be used).")
|
||||
@ApiResponse(responseCode = "422", description = "Invalid request format.")
|
||||
@ApiResponse(responseCode = "429", description = "Ratelimited.")
|
||||
public UsernameHashResponse confirmUsernameHash(
|
||||
public CompletableFuture<UsernameHashResponse> confirmUsernameHash(
|
||||
@Auth final AuthenticatedAccount auth,
|
||||
@NotNull @Valid final ConfirmUsernameHashRequest confirmRequest) throws RateLimitExceededException {
|
||||
rateLimiters.getUsernameSetLimiter().validate(auth.getAccount().getUuid());
|
||||
@NotNull @Valid final ConfirmUsernameHashRequest confirmRequest) {
|
||||
|
||||
try {
|
||||
usernameHashZkProofVerifier.verifyProof(confirmRequest.zkProof(), confirmRequest.usernameHash());
|
||||
@@ -343,20 +345,26 @@ public class AccountController {
|
||||
throw new WebApplicationException(Response.status(422).build());
|
||||
}
|
||||
|
||||
try {
|
||||
final Account account = accounts.confirmReservedUsernameHash(
|
||||
auth.getAccount(),
|
||||
confirmRequest.usernameHash(),
|
||||
confirmRequest.encryptedUsername());
|
||||
final UUID linkHandle = account.getUsernameLinkHandle();
|
||||
return new UsernameHashResponse(
|
||||
account.getUsernameHash().orElseThrow(() -> new IllegalStateException("Could not get username after setting")),
|
||||
linkHandle);
|
||||
} catch (final UsernameReservationNotFoundException e) {
|
||||
throw new WebApplicationException(Status.CONFLICT);
|
||||
} catch (final UsernameHashNotAvailableException e) {
|
||||
throw new WebApplicationException(Status.GONE);
|
||||
}
|
||||
return rateLimiters.getUsernameSetLimiter().validateAsync(auth.getAccount().getUuid())
|
||||
.thenCompose(ignored -> accounts.confirmReservedUsernameHash(
|
||||
auth.getAccount(),
|
||||
confirmRequest.usernameHash(),
|
||||
confirmRequest.encryptedUsername()))
|
||||
.thenApply(updatedAccount -> new UsernameHashResponse(updatedAccount.getUsernameHash()
|
||||
.orElseThrow(() -> new IllegalStateException("Could not get username after setting")),
|
||||
updatedAccount.getUsernameLinkHandle()))
|
||||
.exceptionally(throwable -> {
|
||||
if (ExceptionUtils.unwrap(throwable) instanceof UsernameReservationNotFoundException) {
|
||||
throw new WebApplicationException(Status.CONFLICT);
|
||||
}
|
||||
|
||||
if (ExceptionUtils.unwrap(throwable) instanceof UsernameHashNotAvailableException) {
|
||||
throw new WebApplicationException(Status.GONE);
|
||||
}
|
||||
|
||||
throw ExceptionUtils.wrap(throwable);
|
||||
})
|
||||
.toCompletableFuture();
|
||||
}
|
||||
|
||||
@GET
|
||||
@@ -372,9 +380,9 @@ public class AccountController {
|
||||
@ApiResponse(responseCode = "200", description = "Account found for the given username.", useReturnTypeSchema = true)
|
||||
@ApiResponse(responseCode = "400", description = "Request must not be authenticated.")
|
||||
@ApiResponse(responseCode = "404", description = "Account not fount for the given username.")
|
||||
public AccountIdentifierResponse lookupUsernameHash(
|
||||
public CompletableFuture<AccountIdentifierResponse> lookupUsernameHash(
|
||||
@Auth final Optional<AuthenticatedAccount> maybeAuthenticatedAccount,
|
||||
@PathParam("usernameHash") final String usernameHash) throws RateLimitExceededException {
|
||||
@PathParam("usernameHash") final String usernameHash) {
|
||||
|
||||
requireNotAuthenticated(maybeAuthenticatedAccount);
|
||||
final byte[] hash;
|
||||
@@ -388,12 +396,10 @@ public class AccountController {
|
||||
throw new WebApplicationException(Response.status(422).build());
|
||||
}
|
||||
|
||||
return accounts
|
||||
.getByUsernameHash(hash)
|
||||
.map(Account::getUuid)
|
||||
return accounts.getByUsernameHash(hash).thenApply(maybeAccount -> maybeAccount.map(Account::getUuid)
|
||||
.map(AciServiceIdentifier::new)
|
||||
.map(AccountIdentifierResponse::new)
|
||||
.orElseThrow(() -> new WebApplicationException(Status.NOT_FOUND));
|
||||
.orElseThrow(() -> new WebApplicationException(Status.NOT_FOUND)));
|
||||
}
|
||||
|
||||
@PUT
|
||||
@@ -464,16 +470,16 @@ public class AccountController {
|
||||
@ApiResponse(responseCode = "404", description = "Username link was not found for the given handle.")
|
||||
@ApiResponse(responseCode = "422", description = "Invalid request format.")
|
||||
@ApiResponse(responseCode = "429", description = "Ratelimited.")
|
||||
public EncryptedUsername lookupUsernameLink(
|
||||
public CompletableFuture<EncryptedUsername> lookupUsernameLink(
|
||||
@Auth final Optional<AuthenticatedAccount> maybeAuthenticatedAccount,
|
||||
@PathParam("uuid") final UUID usernameLinkHandle) {
|
||||
final Optional<byte[]> maybeEncryptedUsername = accounts.getByUsernameLinkHandle(usernameLinkHandle)
|
||||
.flatMap(Account::getEncryptedUsername);
|
||||
|
||||
requireNotAuthenticated(maybeAuthenticatedAccount);
|
||||
if (maybeEncryptedUsername.isEmpty()) {
|
||||
throw new WebApplicationException(Status.NOT_FOUND);
|
||||
}
|
||||
return new EncryptedUsername(maybeEncryptedUsername.get());
|
||||
|
||||
return accounts.getByUsernameLinkHandle(usernameLinkHandle)
|
||||
.thenApply(maybeAccount -> maybeAccount.flatMap(Account::getEncryptedUsername)
|
||||
.map(EncryptedUsername::new)
|
||||
.orElseThrow(NotFoundException::new));
|
||||
}
|
||||
|
||||
@Operation(
|
||||
|
||||
@@ -28,7 +28,6 @@ import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionException;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.annotation.Nonnull;
|
||||
@@ -346,75 +345,83 @@ public class Accounts extends AbstractDynamoDbStore {
|
||||
/**
|
||||
* Reserve a username hash under the account UUID
|
||||
*/
|
||||
public void reserveUsernameHash(
|
||||
public CompletableFuture<Void> reserveUsernameHash(
|
||||
final Account account,
|
||||
final byte[] reservedUsernameHash,
|
||||
final Duration ttl) {
|
||||
final long startNanos = System.nanoTime();
|
||||
|
||||
final Timer.Sample sample = Timer.start();
|
||||
|
||||
// if there is an existing old reservation it will be cleaned up via ttl
|
||||
final Optional<byte[]> maybeOriginalReservation = account.getReservedUsernameHash();
|
||||
account.setReservedUsernameHash(reservedUsernameHash);
|
||||
|
||||
boolean succeeded = false;
|
||||
|
||||
final long expirationTime = clock.instant().plus(ttl).getEpochSecond();
|
||||
|
||||
// Use account UUID as a "reservation token" - by providing this, the client proves ownership of the hash
|
||||
final UUID uuid = account.getUuid();
|
||||
final byte[] accountJsonBytes;
|
||||
|
||||
try {
|
||||
final List<TransactWriteItem> writeItems = new ArrayList<>();
|
||||
|
||||
writeItems.add(TransactWriteItem.builder()
|
||||
.put(Put.builder()
|
||||
.tableName(usernamesConstraintTableName)
|
||||
.item(Map.of(
|
||||
KEY_ACCOUNT_UUID, AttributeValues.fromUUID(uuid),
|
||||
ATTR_USERNAME_HASH, AttributeValues.fromByteArray(reservedUsernameHash),
|
||||
ATTR_TTL, AttributeValues.fromLong(expirationTime),
|
||||
ATTR_CONFIRMED, AttributeValues.fromBool(false)))
|
||||
.conditionExpression("attribute_not_exists(#username_hash) OR (#ttl < :now)")
|
||||
.expressionAttributeNames(Map.of("#username_hash", ATTR_USERNAME_HASH, "#ttl", ATTR_TTL))
|
||||
.expressionAttributeValues(Map.of(":now", AttributeValues.fromLong(clock.instant().getEpochSecond())))
|
||||
.returnValuesOnConditionCheckFailure(ReturnValuesOnConditionCheckFailure.ALL_OLD)
|
||||
.build())
|
||||
.build());
|
||||
|
||||
writeItems.add(
|
||||
TransactWriteItem.builder()
|
||||
.update(Update.builder()
|
||||
.tableName(accountsTableName)
|
||||
.key(Map.of(KEY_ACCOUNT_UUID, AttributeValues.fromUUID(uuid)))
|
||||
.updateExpression("SET #data = :data ADD #version :version_increment")
|
||||
.conditionExpression("#version = :version")
|
||||
.expressionAttributeNames(Map.of("#data", ATTR_ACCOUNT_DATA, "#version", ATTR_VERSION))
|
||||
.expressionAttributeValues(Map.of(
|
||||
":data", accountDataAttributeValue(account),
|
||||
":version", AttributeValues.fromInt(account.getVersion()),
|
||||
":version_increment", AttributeValues.fromInt(1)))
|
||||
.build())
|
||||
.build());
|
||||
|
||||
final TransactWriteItemsRequest request = TransactWriteItemsRequest.builder()
|
||||
.transactItems(writeItems)
|
||||
.build();
|
||||
|
||||
db().transactWriteItems(request);
|
||||
|
||||
account.setVersion(account.getVersion() + 1);
|
||||
succeeded = true;
|
||||
accountJsonBytes = SystemMapper.jsonMapper().writeValueAsBytes(account);
|
||||
} catch (final JsonProcessingException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
} catch (final TransactionCanceledException e) {
|
||||
if (e.cancellationReasons().stream().map(CancellationReason::code).anyMatch(CONDITIONAL_CHECK_FAILED::equals)) {
|
||||
throw new ContestedOptimisticLockException();
|
||||
}
|
||||
throw e;
|
||||
} finally {
|
||||
if (!succeeded) {
|
||||
account.setReservedUsernameHash(maybeOriginalReservation.orElse(null));
|
||||
}
|
||||
RESERVE_USERNAME_TIMER.record(System.nanoTime() - startNanos, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
|
||||
final List<TransactWriteItem> writeItems = new ArrayList<>();
|
||||
|
||||
writeItems.add(TransactWriteItem.builder()
|
||||
.put(Put.builder()
|
||||
.tableName(usernamesConstraintTableName)
|
||||
.item(Map.of(
|
||||
KEY_ACCOUNT_UUID, AttributeValues.fromUUID(uuid),
|
||||
ATTR_USERNAME_HASH, AttributeValues.fromByteArray(reservedUsernameHash),
|
||||
ATTR_TTL, AttributeValues.fromLong(expirationTime),
|
||||
ATTR_CONFIRMED, AttributeValues.fromBool(false)))
|
||||
.conditionExpression("attribute_not_exists(#username_hash) OR (#ttl < :now)")
|
||||
.expressionAttributeNames(Map.of("#username_hash", ATTR_USERNAME_HASH, "#ttl", ATTR_TTL))
|
||||
.expressionAttributeValues(Map.of(":now", AttributeValues.fromLong(clock.instant().getEpochSecond())))
|
||||
.returnValuesOnConditionCheckFailure(ReturnValuesOnConditionCheckFailure.ALL_OLD)
|
||||
.build())
|
||||
.build());
|
||||
|
||||
writeItems.add(
|
||||
TransactWriteItem.builder()
|
||||
.update(Update.builder()
|
||||
.tableName(accountsTableName)
|
||||
.key(Map.of(KEY_ACCOUNT_UUID, AttributeValues.fromUUID(uuid)))
|
||||
.updateExpression("SET #data = :data ADD #version :version_increment")
|
||||
.conditionExpression("#version = :version")
|
||||
.expressionAttributeNames(Map.of("#data", ATTR_ACCOUNT_DATA, "#version", ATTR_VERSION))
|
||||
.expressionAttributeValues(Map.of(
|
||||
":data", AttributeValues.fromByteArray(accountJsonBytes),
|
||||
":version", AttributeValues.fromInt(account.getVersion()),
|
||||
":version_increment", AttributeValues.fromInt(1)))
|
||||
.build())
|
||||
.build());
|
||||
|
||||
return asyncClient.transactWriteItems(TransactWriteItemsRequest.builder()
|
||||
.transactItems(writeItems)
|
||||
.build())
|
||||
.exceptionally(throwable -> {
|
||||
if (ExceptionUtils.unwrap(throwable) instanceof TransactionCanceledException e) {
|
||||
if (e.cancellationReasons().stream().map(CancellationReason::code).anyMatch(CONDITIONAL_CHECK_FAILED::equals)) {
|
||||
throw new ContestedOptimisticLockException();
|
||||
}
|
||||
}
|
||||
|
||||
throw ExceptionUtils.wrap(throwable);
|
||||
})
|
||||
.whenComplete((response, throwable) -> {
|
||||
sample.stop(RESERVE_USERNAME_TIMER);
|
||||
|
||||
if (throwable == null) {
|
||||
account.setVersion(account.getVersion() + 1);
|
||||
} else {
|
||||
account.setReservedUsernameHash(maybeOriginalReservation.orElse(null));
|
||||
}
|
||||
})
|
||||
.thenRun(() -> {});
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -422,22 +429,24 @@ public class Accounts extends AbstractDynamoDbStore {
|
||||
*
|
||||
* @param account to update
|
||||
* @param usernameHash believed to be available
|
||||
* @throws ContestedOptimisticLockException if the account has been updated or the username has taken by someone else
|
||||
* @return a future that completes once the username hash has been confirmed; may fail with an
|
||||
* {@link ContestedOptimisticLockException} if the account has been updated or the username has taken by someone else
|
||||
*/
|
||||
public void confirmUsernameHash(final Account account, final byte[] usernameHash, @Nullable final byte[] encryptedUsername)
|
||||
throws ContestedOptimisticLockException {
|
||||
final long startNanos = System.nanoTime();
|
||||
public CompletableFuture<Void> confirmUsernameHash(final Account account, final byte[] usernameHash, @Nullable final byte[] encryptedUsername) {
|
||||
final Timer.Sample sample = Timer.start();
|
||||
|
||||
final Optional<byte[]> maybeOriginalUsernameHash = account.getUsernameHash();
|
||||
final Optional<byte[]> maybeOriginalReservationHash = account.getReservedUsernameHash();
|
||||
final Optional<UUID> maybeOriginalUsernameLinkHandle = Optional.ofNullable(account.getUsernameLinkHandle());
|
||||
final Optional<byte[]> maybeOriginalEncryptedUsername = account.getEncryptedUsername();
|
||||
|
||||
final UUID newLinkHandle = UUID.randomUUID();
|
||||
|
||||
account.setUsernameHash(usernameHash);
|
||||
account.setReservedUsernameHash(null);
|
||||
account.setUsernameLinkDetails(encryptedUsername == null ? null : UUID.randomUUID(), encryptedUsername);
|
||||
account.setUsernameLinkDetails(encryptedUsername == null ? null : newLinkHandle, encryptedUsername);
|
||||
|
||||
boolean succeeded = false;
|
||||
final TransactWriteItemsRequest request;
|
||||
|
||||
try {
|
||||
final List<TransactWriteItem> writeItems = new ArrayList<>();
|
||||
@@ -493,83 +502,92 @@ public class Accounts extends AbstractDynamoDbStore {
|
||||
maybeOriginalUsernameHash.ifPresent(originalUsernameHash -> writeItems.add(
|
||||
buildDelete(usernamesConstraintTableName, ATTR_USERNAME_HASH, originalUsernameHash)));
|
||||
|
||||
final TransactWriteItemsRequest request = TransactWriteItemsRequest.builder()
|
||||
request = TransactWriteItemsRequest.builder()
|
||||
.transactItems(writeItems)
|
||||
.build();
|
||||
|
||||
db().transactWriteItems(request);
|
||||
|
||||
account.setVersion(account.getVersion() + 1);
|
||||
succeeded = true;
|
||||
} catch (final JsonProcessingException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
} catch (final TransactionCanceledException e) {
|
||||
if (e.cancellationReasons().stream().map(CancellationReason::code).anyMatch(CONDITIONAL_CHECK_FAILED::equals)) {
|
||||
throw new ContestedOptimisticLockException();
|
||||
}
|
||||
throw e;
|
||||
} finally {
|
||||
if (!succeeded) {
|
||||
account.setUsernameHash(maybeOriginalUsernameHash.orElse(null));
|
||||
account.setReservedUsernameHash(maybeOriginalReservationHash.orElse(null));
|
||||
account.setUsernameLinkDetails(maybeOriginalUsernameLinkHandle.orElse(null), maybeOriginalEncryptedUsername.orElse(null));
|
||||
}
|
||||
SET_USERNAME_TIMER.record(System.nanoTime() - startNanos, TimeUnit.NANOSECONDS);
|
||||
account.setUsernameLinkDetails(maybeOriginalUsernameLinkHandle.orElse(null), maybeOriginalEncryptedUsername.orElse(null));
|
||||
account.setReservedUsernameHash(maybeOriginalReservationHash.orElse(null));
|
||||
account.setUsernameHash(maybeOriginalUsernameHash.orElse(null));
|
||||
}
|
||||
}
|
||||
|
||||
public void clearUsernameHash(final Account account) {
|
||||
account.getUsernameHash().ifPresent(usernameHash -> {
|
||||
CLEAR_USERNAME_HASH_TIMER.record(() -> {
|
||||
account.setUsernameHash(null);
|
||||
|
||||
boolean succeeded = false;
|
||||
|
||||
try {
|
||||
final List<TransactWriteItem> writeItems = new ArrayList<>();
|
||||
|
||||
writeItems.add(
|
||||
TransactWriteItem.builder()
|
||||
.update(Update.builder()
|
||||
.tableName(accountsTableName)
|
||||
.key(Map.of(KEY_ACCOUNT_UUID, AttributeValues.fromUUID(account.getUuid())))
|
||||
.updateExpression("SET #data = :data REMOVE #username_hash ADD #version :version_increment")
|
||||
.conditionExpression("#version = :version")
|
||||
.expressionAttributeNames(Map.of("#data", ATTR_ACCOUNT_DATA,
|
||||
"#username_hash", ATTR_USERNAME_HASH,
|
||||
"#version", ATTR_VERSION))
|
||||
.expressionAttributeValues(Map.of(
|
||||
":data", accountDataAttributeValue(account),
|
||||
":version", AttributeValues.fromInt(account.getVersion()),
|
||||
":version_increment", AttributeValues.fromInt(1)))
|
||||
.build())
|
||||
.build());
|
||||
|
||||
writeItems.add(buildDelete(usernamesConstraintTableName, ATTR_USERNAME_HASH, usernameHash));
|
||||
|
||||
final TransactWriteItemsRequest request = TransactWriteItemsRequest.builder()
|
||||
.transactItems(writeItems)
|
||||
.build();
|
||||
|
||||
db().transactWriteItems(request);
|
||||
return asyncClient.transactWriteItems(request)
|
||||
.thenRun(() -> {
|
||||
account.setUsernameHash(usernameHash);
|
||||
account.setReservedUsernameHash(null);
|
||||
account.setUsernameLinkDetails(encryptedUsername == null ? null : newLinkHandle, encryptedUsername);
|
||||
|
||||
account.setVersion(account.getVersion() + 1);
|
||||
succeeded = true;
|
||||
} catch (final JsonProcessingException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
} catch (final TransactionCanceledException e) {
|
||||
if (conditionalCheckFailed(e.cancellationReasons().get(0))) {
|
||||
throw new ContestedOptimisticLockException();
|
||||
})
|
||||
.exceptionally(throwable -> {
|
||||
if (ExceptionUtils.unwrap(throwable) instanceof TransactionCanceledException transactionCanceledException) {
|
||||
if (transactionCanceledException.cancellationReasons().stream().map(CancellationReason::code).anyMatch(CONDITIONAL_CHECK_FAILED::equals)) {
|
||||
throw new ContestedOptimisticLockException();
|
||||
}
|
||||
}
|
||||
|
||||
throw e;
|
||||
} finally {
|
||||
if (!succeeded) {
|
||||
account.setUsernameHash(usernameHash);
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
throw ExceptionUtils.wrap(throwable);
|
||||
})
|
||||
.whenComplete((ignored, throwable) -> sample.stop(SET_USERNAME_TIMER));
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> clearUsernameHash(final Account account) {
|
||||
return account.getUsernameHash().map(usernameHash -> {
|
||||
final Timer.Sample sample = Timer.start();
|
||||
|
||||
final TransactWriteItemsRequest request;
|
||||
|
||||
try {
|
||||
final List<TransactWriteItem> writeItems = new ArrayList<>();
|
||||
|
||||
account.setUsernameHash(null);
|
||||
|
||||
writeItems.add(
|
||||
TransactWriteItem.builder()
|
||||
.update(Update.builder()
|
||||
.tableName(accountsTableName)
|
||||
.key(Map.of(KEY_ACCOUNT_UUID, AttributeValues.fromUUID(account.getUuid())))
|
||||
.updateExpression("SET #data = :data REMOVE #username_hash ADD #version :version_increment")
|
||||
.conditionExpression("#version = :version")
|
||||
.expressionAttributeNames(Map.of("#data", ATTR_ACCOUNT_DATA,
|
||||
"#username_hash", ATTR_USERNAME_HASH,
|
||||
"#version", ATTR_VERSION))
|
||||
.expressionAttributeValues(Map.of(
|
||||
":data", accountDataAttributeValue(account),
|
||||
":version", AttributeValues.fromInt(account.getVersion()),
|
||||
":version_increment", AttributeValues.fromInt(1)))
|
||||
.build())
|
||||
.build());
|
||||
|
||||
writeItems.add(buildDelete(usernamesConstraintTableName, ATTR_USERNAME_HASH, usernameHash));
|
||||
|
||||
request = TransactWriteItemsRequest.builder()
|
||||
.transactItems(writeItems)
|
||||
.build();
|
||||
} catch (final JsonProcessingException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
} finally {
|
||||
account.setUsernameHash(usernameHash);
|
||||
}
|
||||
|
||||
return asyncClient.transactWriteItems(request)
|
||||
.thenAccept(ignored -> {
|
||||
account.setUsernameHash(null);
|
||||
account.setVersion(account.getVersion() + 1);
|
||||
})
|
||||
.exceptionally(throwable -> {
|
||||
if (ExceptionUtils.unwrap(throwable) instanceof TransactionCanceledException transactionCanceledException) {
|
||||
if (conditionalCheckFailed(transactionCanceledException.cancellationReasons().get(0))) {
|
||||
throw new ContestedOptimisticLockException();
|
||||
}
|
||||
}
|
||||
|
||||
throw ExceptionUtils.wrap(throwable);
|
||||
})
|
||||
.whenComplete((ignored, throwable) -> sample.stop(CLEAR_USERNAME_HASH_TIMER));
|
||||
}).orElseGet(() -> CompletableFuture.completedFuture(null));
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@@ -655,29 +673,26 @@ public class Accounts extends AbstractDynamoDbStore {
|
||||
}
|
||||
}
|
||||
|
||||
public boolean usernameHashAvailable(final byte[] username) {
|
||||
public CompletableFuture<Boolean> usernameHashAvailable(final byte[] username) {
|
||||
return usernameHashAvailable(Optional.empty(), username);
|
||||
}
|
||||
|
||||
public boolean usernameHashAvailable(final Optional<UUID> accountUuid, final byte[] usernameHash) {
|
||||
final Optional<Map<String, AttributeValue>> usernameHashItem = itemByKey(
|
||||
usernamesConstraintTableName, ATTR_USERNAME_HASH, AttributeValues.fromByteArray(usernameHash));
|
||||
public CompletableFuture<Boolean> usernameHashAvailable(final Optional<UUID> accountUuid, final byte[] usernameHash) {
|
||||
return itemByKeyAsync(usernamesConstraintTableName, ATTR_USERNAME_HASH, AttributeValues.fromByteArray(usernameHash))
|
||||
.thenApply(maybeUsernameHashItem -> maybeUsernameHashItem
|
||||
.map(item -> {
|
||||
if (AttributeValues.getLong(item, ATTR_TTL, Long.MAX_VALUE) < clock.instant().getEpochSecond()) {
|
||||
// username hash was reserved, but has expired
|
||||
return true;
|
||||
}
|
||||
|
||||
if (usernameHashItem.isEmpty()) {
|
||||
// username hash is free
|
||||
return true;
|
||||
}
|
||||
final Map<String, AttributeValue> item = usernameHashItem.get();
|
||||
|
||||
if (AttributeValues.getLong(item, ATTR_TTL, Long.MAX_VALUE) < clock.instant().getEpochSecond()) {
|
||||
// username hash was reserved, but has expired
|
||||
return true;
|
||||
}
|
||||
|
||||
// username hash is reserved by us
|
||||
return !AttributeValues.getBool(item, ATTR_CONFIRMED, true) && accountUuid
|
||||
.map(AttributeValues.getUUID(item, KEY_ACCOUNT_UUID, new UUID(0, 0))::equals)
|
||||
.orElse(false);
|
||||
// username hash is reserved by us
|
||||
return !AttributeValues.getBool(item, ATTR_CONFIRMED, true) && accountUuid
|
||||
.map(AttributeValues.getUUID(item, KEY_ACCOUNT_UUID, new UUID(0, 0))::equals)
|
||||
.orElse(false);
|
||||
})
|
||||
// If no item was found, then the username hash is free
|
||||
.orElse(true));
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@@ -704,9 +719,8 @@ public class Accounts extends AbstractDynamoDbStore {
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
public Optional<Account> getByUsernameHash(final byte[] usernameHash) {
|
||||
return getByIndirectLookup(
|
||||
GET_BY_USERNAME_HASH_TIMER,
|
||||
public CompletableFuture<Optional<Account>> getByUsernameHash(final byte[] usernameHash) {
|
||||
return getByIndirectLookupAsync(GET_BY_USERNAME_HASH_TIMER,
|
||||
usernamesConstraintTableName,
|
||||
ATTR_USERNAME_HASH,
|
||||
AttributeValues.fromByteArray(usernameHash),
|
||||
@@ -715,10 +729,12 @@ public class Accounts extends AbstractDynamoDbStore {
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
public Optional<Account> getByUsernameLinkHandle(final UUID usernameLinkHandle) {
|
||||
return requireNonNull(GET_BY_USERNAME_LINK_HANDLE_TIMER.record(() ->
|
||||
itemByGsiKey(accountsTableName, USERNAME_LINK_TO_UUID_INDEX, ATTR_USERNAME_LINK_UUID, AttributeValues.fromUUID(usernameLinkHandle))
|
||||
.map(Accounts::fromItem)));
|
||||
public CompletableFuture<Optional<Account>> getByUsernameLinkHandle(final UUID usernameLinkHandle) {
|
||||
final Timer.Sample sample = Timer.start();
|
||||
|
||||
return itemByGsiKeyAsync(accountsTableName, USERNAME_LINK_TO_UUID_INDEX, ATTR_USERNAME_LINK_UUID, AttributeValues.fromUUID(usernameLinkHandle))
|
||||
.thenApply(maybeItem -> maybeItem.map(Accounts::fromItem))
|
||||
.whenComplete((account, throwable) -> sample.stop(GET_BY_USERNAME_LINK_HANDLE_TIMER));
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@@ -945,6 +961,35 @@ public class Accounts extends AbstractDynamoDbStore {
|
||||
return itemByKey(table, KEY_ACCOUNT_UUID, primaryKeyValue);
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
private CompletableFuture<Optional<Map<String, AttributeValue>>> itemByGsiKeyAsync(final String table, final String indexName, final String keyName, final AttributeValue keyValue) {
|
||||
return asyncClient.query(QueryRequest.builder()
|
||||
.tableName(table)
|
||||
.indexName(indexName)
|
||||
.keyConditionExpression("#gsiKey = :gsiValue")
|
||||
.projectionExpression("#uuid")
|
||||
.expressionAttributeNames(Map.of(
|
||||
"#gsiKey", keyName,
|
||||
"#uuid", KEY_ACCOUNT_UUID))
|
||||
.expressionAttributeValues(Map.of(
|
||||
":gsiValue", keyValue))
|
||||
.build())
|
||||
.thenCompose(response -> {
|
||||
if (response.count() == 0) {
|
||||
return CompletableFuture.completedFuture(Optional.empty());
|
||||
}
|
||||
|
||||
if (response.count() > 1) {
|
||||
return CompletableFuture.failedFuture(new IllegalStateException(
|
||||
"More than one row located for GSI [%s], key-value pair [%s, %s]"
|
||||
.formatted(indexName, keyName, keyValue)));
|
||||
}
|
||||
|
||||
final AttributeValue primaryKeyValue = response.items().get(0).get(KEY_ACCOUNT_UUID);
|
||||
return itemByKeyAsync(table, KEY_ACCOUNT_UUID, primaryKeyValue);
|
||||
});
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
private TransactWriteItem buildAccountPut(
|
||||
final Account account,
|
||||
|
||||
@@ -23,15 +23,18 @@ import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.time.Clock;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Base64;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.OptionalInt;
|
||||
import java.util.Queue;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
@@ -454,42 +457,46 @@ public class AccountsManager {
|
||||
*
|
||||
* @param account the account to update
|
||||
* @param requestedUsernameHashes the list of username hashes to attempt to reserve
|
||||
* @return the reserved username hash and an updated Account object
|
||||
* @throws UsernameHashNotAvailableException no username hash is available
|
||||
* @return a future that yields the reserved username hash and an updated Account object on success; may fail with a
|
||||
* {@link UsernameHashNotAvailableException} if none of the given username hashes are available
|
||||
*/
|
||||
public UsernameReservation reserveUsernameHash(final Account account, final List<byte[]> requestedUsernameHashes) throws UsernameHashNotAvailableException {
|
||||
public CompletableFuture<UsernameReservation> reserveUsernameHash(final Account account, final List<byte[]> requestedUsernameHashes) {
|
||||
if (!experimentEnrollmentManager.isEnrolled(account.getUuid(), USERNAME_EXPERIMENT_NAME)) {
|
||||
throw new UsernameHashNotAvailableException();
|
||||
return CompletableFuture.failedFuture(new UsernameHashNotAvailableException());
|
||||
}
|
||||
|
||||
redisDelete(account);
|
||||
final AtomicReference<byte[]> reservedUsernameHash = new AtomicReference<>();
|
||||
|
||||
class Reserver implements AccountPersister {
|
||||
byte[] reservedUsernameHash;
|
||||
return redisDeleteAsync(account)
|
||||
.thenCompose(ignored -> updateWithRetriesAsync(
|
||||
account,
|
||||
a -> true,
|
||||
a -> checkAndReserveNextUsernameHash(a, new ArrayDeque<>(requestedUsernameHashes))
|
||||
.thenAccept(reservedUsernameHash::set),
|
||||
() -> accounts.getByAccountIdentifierAsync(account.getUuid()).thenApply(Optional::orElseThrow),
|
||||
AccountChangeValidator.USERNAME_CHANGE_VALIDATOR,
|
||||
MAX_UPDATE_ATTEMPTS))
|
||||
.thenApply(updatedAccount -> new UsernameReservation(updatedAccount, reservedUsernameHash.get()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void persistAccount(final Account account) throws UsernameHashNotAvailableException {
|
||||
for (byte[] usernameHash : requestedUsernameHashes) {
|
||||
if (accounts.usernameHashAvailable(usernameHash)) {
|
||||
reservedUsernameHash = usernameHash;
|
||||
accounts.reserveUsernameHash(
|
||||
account,
|
||||
usernameHash,
|
||||
USERNAME_HASH_RESERVATION_TTL_MINUTES);
|
||||
return;
|
||||
private CompletableFuture<byte[]> checkAndReserveNextUsernameHash(final Account account, final Queue<byte[]> requestedUsernameHashes) {
|
||||
final byte[] usernameHash;
|
||||
|
||||
try {
|
||||
usernameHash = requestedUsernameHashes.remove();
|
||||
} catch (final NoSuchElementException e) {
|
||||
return CompletableFuture.failedFuture(new UsernameHashNotAvailableException());
|
||||
}
|
||||
|
||||
return accounts.usernameHashAvailable(usernameHash)
|
||||
.thenCompose(usernameHashAvailable -> {
|
||||
if (usernameHashAvailable) {
|
||||
return accounts.reserveUsernameHash(account, usernameHash, USERNAME_HASH_RESERVATION_TTL_MINUTES)
|
||||
.thenApply(ignored -> usernameHash);
|
||||
} else {
|
||||
return checkAndReserveNextUsernameHash(account, requestedUsernameHashes);
|
||||
}
|
||||
}
|
||||
throw new UsernameHashNotAvailableException();
|
||||
}
|
||||
}
|
||||
final Reserver reserver = new Reserver();
|
||||
final Account updatedAccount = failableUpdateWithRetries(
|
||||
account,
|
||||
a -> true,
|
||||
reserver,
|
||||
() -> accounts.getByAccountIdentifier(account.getUuid()).orElseThrow(),
|
||||
AccountChangeValidator.USERNAME_CHANGE_VALIDATOR);
|
||||
return new UsernameReservation(updatedAccount, reserver.reservedUsernameHash);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -498,50 +505,54 @@ public class AccountsManager {
|
||||
* @param account the account to update
|
||||
* @param reservedUsernameHash the previously reserved username hash
|
||||
* @param encryptedUsername the encrypted form of the previously reserved username for the username link
|
||||
* @return the updated account with the username hash field set
|
||||
* @throws UsernameHashNotAvailableException if the reserved username hash has been taken (because the reservation expired)
|
||||
* @throws UsernameReservationNotFoundException if `reservedUsernameHash` was not reserved in the account
|
||||
* @return a future that yields the updated account with the username hash field set; may fail with a
|
||||
* {@link UsernameHashNotAvailableException} if the reserved username hash has been taken (because the reservation
|
||||
* expired) or a {@link UsernameReservationNotFoundException} if {@code reservedUsernameHash} was not reserved in the
|
||||
* account
|
||||
*/
|
||||
public Account confirmReservedUsernameHash(final Account account, final byte[] reservedUsernameHash, @Nullable final byte[] encryptedUsername) throws UsernameHashNotAvailableException, UsernameReservationNotFoundException {
|
||||
public CompletableFuture<Account> confirmReservedUsernameHash(final Account account, final byte[] reservedUsernameHash, @Nullable final byte[] encryptedUsername) {
|
||||
if (!experimentEnrollmentManager.isEnrolled(account.getUuid(), USERNAME_EXPERIMENT_NAME)) {
|
||||
throw new UsernameHashNotAvailableException();
|
||||
return CompletableFuture.failedFuture(new UsernameHashNotAvailableException());
|
||||
}
|
||||
|
||||
if (account.getUsernameHash().map(currentUsernameHash -> Arrays.equals(currentUsernameHash, reservedUsernameHash)).orElse(false)) {
|
||||
// the client likely already succeeded and is retrying
|
||||
return account;
|
||||
return CompletableFuture.completedFuture(account);
|
||||
}
|
||||
|
||||
if (!account.getReservedUsernameHash().map(oldHash -> Arrays.equals(oldHash, reservedUsernameHash)).orElse(false)) {
|
||||
// no such reservation existed, either there was no previous call to reserveUsername
|
||||
// or the reservation changed
|
||||
throw new UsernameReservationNotFoundException();
|
||||
return CompletableFuture.failedFuture(new UsernameReservationNotFoundException());
|
||||
}
|
||||
|
||||
redisDelete(account);
|
||||
return redisDeleteAsync(account)
|
||||
.thenCompose(ignored -> updateWithRetriesAsync(
|
||||
account,
|
||||
a -> true,
|
||||
a -> accounts.usernameHashAvailable(Optional.of(account.getUuid()), reservedUsernameHash)
|
||||
.thenCompose(usernameHashAvailable -> {
|
||||
if (!usernameHashAvailable) {
|
||||
return CompletableFuture.failedFuture(new UsernameHashNotAvailableException());
|
||||
}
|
||||
|
||||
return failableUpdateWithRetries(
|
||||
account,
|
||||
a -> true,
|
||||
a -> {
|
||||
// though we know this username hash was reserved, the reservation could have lapsed
|
||||
if (!accounts.usernameHashAvailable(Optional.of(account.getUuid()), reservedUsernameHash)) {
|
||||
throw new UsernameHashNotAvailableException();
|
||||
}
|
||||
accounts.confirmUsernameHash(a, reservedUsernameHash, encryptedUsername);
|
||||
},
|
||||
() -> accounts.getByAccountIdentifier(account.getUuid()).orElseThrow(),
|
||||
AccountChangeValidator.USERNAME_CHANGE_VALIDATOR);
|
||||
return accounts.confirmUsernameHash(a, reservedUsernameHash, encryptedUsername);
|
||||
}),
|
||||
() -> accounts.getByAccountIdentifierAsync(account.getUuid()).thenApply(Optional::orElseThrow),
|
||||
AccountChangeValidator.USERNAME_CHANGE_VALIDATOR,
|
||||
MAX_UPDATE_ATTEMPTS
|
||||
));
|
||||
}
|
||||
|
||||
public Account clearUsernameHash(final Account account) {
|
||||
redisDelete(account);
|
||||
|
||||
return updateWithRetries(
|
||||
account,
|
||||
a -> true,
|
||||
accounts::clearUsernameHash,
|
||||
() -> accounts.getByAccountIdentifier(account.getUuid()).orElseThrow(),
|
||||
AccountChangeValidator.USERNAME_CHANGE_VALIDATOR);
|
||||
public CompletableFuture<Account> clearUsernameHash(final Account account) {
|
||||
return redisDeleteAsync(account)
|
||||
.thenCompose(ignored -> updateWithRetriesAsync(
|
||||
account,
|
||||
a -> true,
|
||||
accounts::clearUsernameHash,
|
||||
() -> accounts.getByAccountIdentifierAsync(account.getUuid()).thenApply(Optional::orElseThrow),
|
||||
AccountChangeValidator.USERNAME_CHANGE_VALIDATOR,
|
||||
MAX_UPDATE_ATTEMPTS));
|
||||
}
|
||||
|
||||
public Account update(Account account, Consumer<Account> updater) {
|
||||
@@ -780,18 +791,18 @@ public class AccountsManager {
|
||||
);
|
||||
}
|
||||
|
||||
public Optional<Account> getByUsernameLinkHandle(final UUID usernameLinkHandle) {
|
||||
return checkRedisThenAccounts(
|
||||
public CompletableFuture<Optional<Account>> getByUsernameLinkHandle(final UUID usernameLinkHandle) {
|
||||
return checkRedisThenAccountsAsync(
|
||||
getByUsernameLinkHandleTimer,
|
||||
() -> redisGetBySecondaryKey(getAccountMapKey(usernameLinkHandle.toString()), redisUsernameLinkHandleGetTimer),
|
||||
() -> redisGetBySecondaryKeyAsync(getAccountMapKey(usernameLinkHandle.toString()), redisUsernameLinkHandleGetTimer),
|
||||
() -> accounts.getByUsernameLinkHandle(usernameLinkHandle)
|
||||
);
|
||||
}
|
||||
|
||||
public Optional<Account> getByUsernameHash(final byte[] usernameHash) {
|
||||
return checkRedisThenAccounts(
|
||||
public CompletableFuture<Optional<Account>> getByUsernameHash(final byte[] usernameHash) {
|
||||
return checkRedisThenAccountsAsync(
|
||||
getByUsernameHashTimer,
|
||||
() -> redisGetBySecondaryKey(getUsernameHashAccountMapKey(usernameHash), redisUsernameHashGetTimer),
|
||||
() -> redisGetBySecondaryKeyAsync(getUsernameHashAccountMapKey(usernameHash), redisUsernameHashGetTimer),
|
||||
() -> accounts.getByUsernameHash(usernameHash)
|
||||
);
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ import java.time.Clock;
|
||||
import java.util.Base64;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
@@ -51,6 +52,7 @@ import org.whispersystems.textsecuregcm.storage.ReportMessageManager;
|
||||
import org.whispersystems.textsecuregcm.storage.UsernameHashNotAvailableException;
|
||||
import org.whispersystems.textsecuregcm.storage.UsernameReservationNotFoundException;
|
||||
import org.whispersystems.textsecuregcm.util.DynamoDbFromConfig;
|
||||
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
|
||||
import reactor.core.scheduler.Scheduler;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
||||
@@ -210,17 +212,23 @@ public class AssignUsernameCommand extends EnvironmentCommand<WhisperServerConfi
|
||||
accountsManager.getByAccountIdentifier(accountIdentifier).ifPresentOrElse(account -> {
|
||||
try {
|
||||
final AccountsManager.UsernameReservation reservation = accountsManager.reserveUsernameHash(account,
|
||||
List.of(Base64.getUrlDecoder().decode(usernameHash)));
|
||||
List.of(Base64.getUrlDecoder().decode(usernameHash))).join();
|
||||
final Account result = accountsManager.confirmReservedUsernameHash(
|
||||
account,
|
||||
reservation.reservedUsernameHash(),
|
||||
encryptedUsername == null ? null : Base64.getUrlDecoder().decode(encryptedUsername));
|
||||
encryptedUsername == null ? null : Base64.getUrlDecoder().decode(encryptedUsername)).join();
|
||||
System.out.println("New username hash: " + Base64.getUrlEncoder().encodeToString(result.getUsernameHash().orElseThrow()));
|
||||
System.out.println("New username link handle: " + result.getUsernameLinkHandle().toString());
|
||||
} catch (final UsernameHashNotAvailableException e) {
|
||||
throw new IllegalArgumentException("Username hash already taken");
|
||||
} catch (final UsernameReservationNotFoundException e) {
|
||||
throw new IllegalArgumentException("Username hash reservation not found");
|
||||
} catch (final CompletionException e) {
|
||||
if (ExceptionUtils.unwrap(e) instanceof UsernameHashNotAvailableException) {
|
||||
throw new IllegalArgumentException("Username hash already taken");
|
||||
}
|
||||
|
||||
if (ExceptionUtils.unwrap(e) instanceof UsernameReservationNotFoundException) {
|
||||
throw new IllegalArgumentException("Username hash reservation not found");
|
||||
}
|
||||
|
||||
throw e;
|
||||
}
|
||||
},
|
||||
() -> {
|
||||
|
||||
Reference in New Issue
Block a user