Temporarily hold a username after an account releases it

This commit is contained in:
Ravi Khadiwala
2024-03-01 17:22:11 -06:00
committed by ravi-signal
parent 47b24b5dff
commit 3cc740cda3
7 changed files with 744 additions and 40 deletions

View File

@@ -238,6 +238,7 @@ import org.whispersystems.textsecuregcm.workers.MessagePersisterServiceCommand;
import org.whispersystems.textsecuregcm.workers.ProcessPushNotificationFeedbackCommand;
import org.whispersystems.textsecuregcm.workers.RemoveExpiredAccountsCommand;
import org.whispersystems.textsecuregcm.workers.RemoveExpiredLinkedDevicesCommand;
import org.whispersystems.textsecuregcm.workers.RemoveExpiredUsernameHoldsCommand;
import org.whispersystems.textsecuregcm.workers.ScheduledApnPushNotificationSenderServiceCommand;
import org.whispersystems.textsecuregcm.workers.ServerVersionCommand;
import org.whispersystems.textsecuregcm.workers.SetRequestLoggingEnabledTask;
@@ -297,6 +298,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
bootstrap.addCommand(new ScheduledApnPushNotificationSenderServiceCommand());
bootstrap.addCommand(new MessagePersisterServiceCommand());
bootstrap.addCommand(new RemoveExpiredAccountsCommand(Clock.systemUTC()));
bootstrap.addCommand(new RemoveExpiredUsernameHoldsCommand(Clock.systemUTC()));
bootstrap.addCommand(new ProcessPushNotificationFeedbackCommand(Clock.systemUTC()));
bootstrap.addCommand(new RemoveExpiredLinkedDevicesCommand());
}

View File

@@ -13,6 +13,7 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@@ -106,9 +107,14 @@ public class Account {
@JsonProperty
private int version;
@JsonProperty("holds")
private List<UsernameHold> usernameHolds = Collections.emptyList();
@JsonIgnore
private boolean stale;
public record UsernameHold(@JsonProperty("uh") byte[] usernameHash, @JsonProperty("e") long expirationSecs) {}
public UUID getIdentifier(final IdentityType identityType) {
return switch (identityType) {
case ACI -> getUuid();
@@ -525,6 +531,15 @@ public class Account {
devices.forEach(Device::lockAuthTokenHash);
}
public List<UsernameHold> getUsernameHolds() {
return Collections.unmodifiableList(usernameHolds);
}
public void setUsernameHolds(final List<UsernameHold> usernameHolds) {
this.requireNotStale();
this.usernameHolds = usernameHolds;
}
boolean isStale() {
return stale;
}

View File

@@ -18,6 +18,7 @@ import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@@ -98,6 +99,7 @@ public class Accounts extends AbstractDynamoDbStore {
private static final Timer GET_BY_PNI_TIMER = Metrics.timer(name(Accounts.class, "getByPni"));
private static final Timer GET_BY_UUID_TIMER = Metrics.timer(name(Accounts.class, "getByUuid"));
private static final Timer DELETE_TIMER = Metrics.timer(name(Accounts.class, "delete"));
private static final String USERNAME_HOLD_ADDED_COUNTER_NAME = name(Accounts.class, "usernameHoldAdded");
private static final String CONDITIONAL_CHECK_FAILED = "ConditionalCheckFailed";
@@ -132,6 +134,18 @@ public class Accounts extends AbstractDynamoDbStore {
static final Duration DELETED_ACCOUNTS_TIME_TO_LIVE = Duration.ofDays(30);
/**
* Maximum number of temporary username holds an account can have on recently used usernames
*/
@VisibleForTesting
static final int MAX_USERNAME_HOLDS = 3;
/**
* How long an old username is held for an account after the account initially clears/switches the username
*/
@VisibleForTesting
static final Duration USERNAME_HOLD_DURATION = Duration.ofDays(7);
private final Clock clock;
private final DynamoDbAsyncClient asyncClient;
@@ -456,6 +470,7 @@ public class Accounts extends AbstractDynamoDbStore {
});
}
/**
* Reserve a username hash under the account UUID
* @return a future that completes once the username hash has been reserved; may fail with an
@@ -470,14 +485,61 @@ public class Accounts extends AbstractDynamoDbStore {
final Timer.Sample sample = Timer.start();
// if there is an existing old reservation it will be cleaned up via ttl
// if there is an existing old reservation it will be cleaned up via ttl. Save it so we can restore it to the local
// account if the update fails though.
final Optional<byte[]> maybeOriginalReservation = account.getReservedUsernameHash();
account.setReservedUsernameHash(reservedUsernameHash);
// Normally when a username is reserved for the first time we reserve it for the provided TTL. But if the
// reservation is for a username that we already have a reservation for (for example, if it's reclaimable, or there
// is a hold) we might own that reservation for longer anyways, so we should preserve the original TTL in that case.
// What we'd really like to do is set expirationTime = max(oldExpirationTime, now + ttl), but dynamodb doesn't
// support that. Instead, we'll set expiration if it's greater than the existing expiration, otherwise retry
final long expirationTime = clock.instant().plus(ttl).getEpochSecond();
return tryReserveUsernameHash(account, reservedUsernameHash, expirationTime)
.exceptionallyCompose(ExceptionUtils.exceptionallyHandler(TtlConflictException.class, ttlConflict ->
// retry (once) with the returned expiration time
tryReserveUsernameHash(account, reservedUsernameHash, ttlConflict.getExistingExpirationSeconds())))
.whenComplete((response, throwable) -> {
sample.stop(RESERVE_USERNAME_TIMER);
if (throwable == null) {
account.setVersion(account.getVersion() + 1);
} else {
account.setReservedUsernameHash(maybeOriginalReservation.orElse(null));
}
});
}
private static class TtlConflictException extends ContestedOptimisticLockException {
private final long existingExpirationSeconds;
TtlConflictException(final long existingExpirationSeconds) {
super();
this.existingExpirationSeconds = existingExpirationSeconds;
}
long getExistingExpirationSeconds() {
return existingExpirationSeconds;
}
}
/**
* Try to reserve the provided usernameHash
*
* @param updatedAccount The account, already updated to reserve the provided usernameHash
* @param reservedUsernameHash The usernameHash to reserve
* @param expirationTimeSeconds When the reservation should expire
* @return A future that completes successfully if the usernameHash was reserved
* @throws TtlConflictException if the usernameHash was already reserved but with a longer TTL. The operation should
* be retried with the returned {@link TtlConflictException#getExistingExpirationSeconds()}
*/
private CompletableFuture<Void> tryReserveUsernameHash(
final Account updatedAccount,
final byte[] reservedUsernameHash,
final long expirationTimeSeconds) {
// Use account UUID as a "reservation token" - by providing this, the client proves ownership of the hash
final UUID uuid = account.getUuid();
final UUID uuid = updatedAccount.getUuid();
final List<TransactWriteItem> writeItems = new ArrayList<>();
@@ -487,10 +549,13 @@ public class Accounts extends AbstractDynamoDbStore {
.item(Map.of(
UsernameTable.ATTR_ACCOUNT_UUID, AttributeValues.fromUUID(uuid),
UsernameTable.KEY_USERNAME_HASH, AttributeValues.fromByteArray(reservedUsernameHash),
UsernameTable.ATTR_TTL, AttributeValues.fromLong(expirationTime),
UsernameTable.ATTR_TTL, AttributeValues.fromLong(expirationTimeSeconds),
UsernameTable.ATTR_CONFIRMED, AttributeValues.fromBool(false),
UsernameTable.ATTR_RECLAIMABLE, AttributeValues.fromBool(false)))
.conditionExpression("attribute_not_exists(#username_hash) OR #ttl < :now OR (#aci = :aci AND #confirmed = :confirmed)")
// we can make a reservation if no reservation exists for the name, or that reservation is expired, or there
// is a reservation but it's ours and we haven't confirmed it yet and we're not accidentally reducing our
// reservation's TTL. Note that confirmed=false => a TTL exists
.conditionExpression("attribute_not_exists(#username_hash) OR #ttl < :now OR (#aci = :aci AND #confirmed = :false AND #ttl <= :expirationTime)")
.expressionAttributeNames(Map.of(
"#username_hash", UsernameTable.KEY_USERNAME_HASH,
"#ttl", UsernameTable.ATTR_TTL,
@@ -499,37 +564,134 @@ public class Accounts extends AbstractDynamoDbStore {
.expressionAttributeValues(Map.of(
":now", AttributeValues.fromLong(clock.instant().getEpochSecond()),
":aci", AttributeValues.fromUUID(uuid),
":confirmed", AttributeValues.fromBool(false)))
":false", AttributeValues.fromBool(false),
":expirationTime", AttributeValues.fromLong(expirationTimeSeconds)))
.returnValuesOnConditionCheckFailure(ReturnValuesOnConditionCheckFailure.ALL_OLD)
.build())
.build());
writeItems.add(UpdateAccountSpec.forAccount(accountsTableName, account).transactItem());
writeItems.add(UpdateAccountSpec.forAccount(accountsTableName, updatedAccount).transactItem());
return asyncClient.transactWriteItems(TransactWriteItemsRequest.builder().transactItems(writeItems).build())
return asyncClient
.transactWriteItems(TransactWriteItemsRequest.builder().transactItems(writeItems).build())
.thenRun(Util.NOOP)
.exceptionally(ExceptionUtils.exceptionallyHandler(TransactionCanceledException.class, e -> {
// If the constraint table update failed the condition check, the username's taken and we should stop
// trying. However, if the accounts table fails the conditional check or
// either table was concurrently updated, it's an optimistic locking failure and we should try again.
// trying. However,
if (conditionalCheckFailed(e.cancellationReasons().get(0))) {
// The constraint table update failed the condition check. It could be because the username was taken,
// or because we need to retry with a longer TTL
final Map<String, AttributeValue> item = e.cancellationReasons().get(0).item();
final UUID existingOwner = AttributeValues.getUUID(item, UsernameTable.ATTR_ACCOUNT_UUID, null);
final boolean confirmed = AttributeValues.getBool(item, UsernameTable.ATTR_CONFIRMED, false);
final long existingTtl = AttributeValues.getLong(item, UsernameTable.ATTR_TTL, 0L);
if (uuid.equals(existingOwner) && !confirmed && existingTtl > expirationTimeSeconds) {
// We failed because we provided a shorter TTL than the one that exists on the reservation. The caller
// can retry with updated expiration time.
throw new TtlConflictException(existingTtl);
}
throw ExceptionUtils.wrap(new UsernameHashNotAvailableException());
} else if (conditionalCheckFailed(e.cancellationReasons().get(1)) ||
e.cancellationReasons().stream().anyMatch(Accounts::isTransactionConflict)) {
// The accounts table fails the conditional check or either table was concurrently updated, it's an
// optimistic locking failure and we should try again.
throw new ContestedOptimisticLockException();
} else {
throw ExceptionUtils.wrap(e);
}
}))
.whenComplete((response, throwable) -> {
sample.stop(RESERVE_USERNAME_TIMER);
}));
}
if (throwable == null) {
account.setVersion(account.getVersion() + 1);
} else {
account.setReservedUsernameHash(maybeOriginalReservation.orElse(null));
}
})
.thenRun(() -> {});
/**
* Add a held usernameHash to the account object.
* <p>
* An account may only have up to MAX_USERNAME_HOLDS held usernames. If adding this hold pushes the account over this
* limit, a usernameHash is returned that the caller must release their hold on.
* <p>
* This only tracks the holds associated with the account, ensuring that no other account can take a held username is
* done via the username constraint table, and should be done transactionally with writing the updated account.
*
* @param accountToUpdate The account to update (in-place)
* @param newHold A username hash to add to the account's holds
* @param now The current time
* @return If present, an old hold that the caller should remove from the username constraint table
*/
private Optional<byte[]> addToHolds(final Account accountToUpdate, final byte[] newHold, final Instant now) {
List<Account.UsernameHold> holds = new ArrayList<>(accountToUpdate.getUsernameHolds());
final Account.UsernameHold holdToAdd = new Account.UsernameHold(newHold,
now.plus(USERNAME_HOLD_DURATION).getEpochSecond());
// Remove any holds that are
// - expired
// - match what we're trying to add (we'll re-add it at the end of the list to refresh the ttl)
// - match our current username
holds.removeIf(hold -> hold.expirationSecs() < now.getEpochSecond()
|| Arrays.equals(newHold, hold.usernameHash())
|| accountToUpdate.getUsernameHash().map(curr -> Arrays.equals(curr, hold.usernameHash())).orElse(false));
// add the new hold
holds.add(holdToAdd);
if (holds.size() <= MAX_USERNAME_HOLDS) {
accountToUpdate.setUsernameHolds(holds);
Metrics.counter(USERNAME_HOLD_ADDED_COUNTER_NAME, "max", String.valueOf(false)).increment();
return Optional.empty();
} else {
accountToUpdate.setUsernameHolds(holds.subList(1, holds.size()));
Metrics.counter(USERNAME_HOLD_ADDED_COUNTER_NAME, "max", String.valueOf(true)).increment();
// Newer holds are always added to the end of the holds list, so the first hold is always the oldest hold. Note
// that if a duplicate hold is added, we remove it from the list and re-add it at the end, this preserves hold
// ordering
return Optional.of(holds.getFirst().usernameHash());
}
}
/**
* Transaction item to update the usernameConstraintTable to "hold" a usernameHash for an account
*
* @param holder The account with the hold.
* @param usernameHash The hash to reserve for the account
* @param now The current time
* @return A transaction item that will update the usernameConstraintTable.
*/
private TransactWriteItem holdUsernameTransactItem(final UUID holder, final byte[] usernameHash, final Instant now) {
return TransactWriteItem.builder().put(Put.builder()
.tableName(usernamesConstraintTableName)
.item(Map.of(
UsernameTable.KEY_USERNAME_HASH, AttributeValues.fromByteArray(usernameHash),
UsernameTable.ATTR_ACCOUNT_UUID, AttributeValues.fromUUID(holder),
UsernameTable.ATTR_CONFIRMED, AttributeValues.fromBool(false),
UsernameTable.ATTR_TTL,
AttributeValues.fromLong(now.plus(USERNAME_HOLD_DURATION).getEpochSecond())))
.build()).build();
}
/**
* Transaction item to release a hold on the usernameConstraintTable
*
* @param holder The account with the hold.
* @param usernameHashToRelease The hash to release for the account
* @param now The current time
* @return A transaction item that will update the usernameConstraintTable. The transaction will fail with a condition
* exception if someone else has a reservation for usernameHashToRelease
*/
private TransactWriteItem releaseHoldIfAllowedTransactItem(
final UUID holder, final byte[] usernameHashToRelease, final Instant now) {
return TransactWriteItem.builder().delete(Delete.builder()
.tableName(usernamesConstraintTableName)
.key(Map.of(UsernameTable.KEY_USERNAME_HASH, AttributeValues.b(usernameHashToRelease)))
// we can release the hold if we own it (and it's not our confirmed username) or if no one owns it
.conditionExpression("(#aci = :aci AND #confirmed = :false) OR #ttl < :now OR attribute_not_exists(#usernameHash)")
.expressionAttributeNames(Map.of(
"#usernameHash", UsernameTable.KEY_USERNAME_HASH,
"#aci", UsernameTable.ATTR_ACCOUNT_UUID,
"#confirmed", UsernameTable.ATTR_CONFIRMED,
"#ttl", UsernameTable.ATTR_TTL))
.expressionAttributeValues(Map.of(
":aci", AttributeValues.b(holder),
":now", AttributeValues.n(now.getEpochSecond()),
":false", AttributeValues.fromBool(false)))
.build()).build();
}
/**
@@ -551,12 +713,14 @@ public class Accounts extends AbstractDynamoDbStore {
return pickLinkHandle(account, usernameHash)
.thenCompose(linkHandle -> {
final Optional<byte[]> maybeOriginalUsernameHash = account.getUsernameHash();
final Account updatedAccount = AccountUtil.cloneAccountAsNotStale(account);
updatedAccount.setUsernameHash(usernameHash);
updatedAccount.setReservedUsernameHash(null);
updatedAccount.setUsernameLinkDetails(encryptedUsername == null ? null : linkHandle, encryptedUsername);
final Optional<byte[]> maybeOriginalUsernameHash = account.getUsernameHash();
final Instant now = clock.instant();
final Optional<byte[]> holdToRemove = maybeOriginalUsernameHash
.flatMap(hold -> addToHolds(updatedAccount, hold, now));
final List<TransactWriteItem> writeItems = new ArrayList<>();
@@ -585,18 +749,22 @@ public class Accounts extends AbstractDynamoDbStore {
// 1: update the account object (conditioned on the version increment)
writeItems.add(UpdateAccountSpec.forAccount(accountsTableName, updatedAccount).transactItem());
// 2?: remove the old username hash (if it exists) from the username constraint table
maybeOriginalUsernameHash.ifPresent(originalUsernameHash -> writeItems.add(
buildDelete(usernamesConstraintTableName, UsernameTable.KEY_USERNAME_HASH, originalUsernameHash)));
// 2?: Add a temporary hold for the old username to stop others from claiming it
maybeOriginalUsernameHash.ifPresent(originalUsernameHash ->
writeItems.add(holdUsernameTransactItem(updatedAccount.getUuid(), originalUsernameHash, now)));
// 3?: Adding that hold may have caused our account to exceed our maximum holds. Release an old hold
holdToRemove.ifPresent(oldHold ->
writeItems.add(releaseHoldIfAllowedTransactItem(updatedAccount.getUuid(), oldHold, now)));
return asyncClient.transactWriteItems(TransactWriteItemsRequest.builder().transactItems(writeItems).build())
.thenApply(ignored -> linkHandle);
.thenApply(ignored -> updatedAccount);
})
.thenApply(linkHandle -> {
.thenApply(updatedAccount -> {
account.setUsernameHash(usernameHash);
account.setReservedUsernameHash(null);
account.setUsernameLinkDetails(encryptedUsername == null ? null : linkHandle, encryptedUsername);
account.setUsernameLinkDetails(updatedAccount.getUsernameLinkHandle(), updatedAccount.getEncryptedUsername().orElse(null));
account.setUsernameHolds(updatedAccount.getUsernameHolds());
account.setVersion(account.getVersion() + 1);
return (Void) null;
})
@@ -606,8 +774,14 @@ public class Accounts extends AbstractDynamoDbStore {
// updated, it's an optimistic locking failure and we should try again.
if (conditionalCheckFailed(e.cancellationReasons().get(0))) {
throw ExceptionUtils.wrap(new UsernameHashNotAvailableException());
} else if (conditionalCheckFailed(e.cancellationReasons().get(1)) ||
e.cancellationReasons().stream().anyMatch(Accounts::isTransactionConflict)) {
} else if (conditionalCheckFailed(e.cancellationReasons().get(1)) // Account version conflict
// When we looked at the holds on our account, we thought we still held the corresponding username
// reservation. But it turned out that someone else has taken the reservation since. This means that the
// TTL on the hold must have just expired, so if we retry we should see that our hold is expired, and we
// won't try to remove it again.
|| (e.cancellationReasons().size() > 3 && conditionalCheckFailed(e.cancellationReasons().get(3)))
// concurrent update on any table
|| e.cancellationReasons().stream().anyMatch(Accounts::isTransactionConflict)) {
throw new ContestedOptimisticLockException();
} else {
throw ExceptionUtils.wrap(e);
@@ -659,21 +833,36 @@ public class Accounts extends AbstractDynamoDbStore {
updatedAccount.setUsernameHash(null);
updatedAccount.setUsernameLinkDetails(null, null);
return asyncClient.transactWriteItems(TransactWriteItemsRequest.builder()
.transactItems(List.of(
// 0: remove the username from the account object, conditioned on account version
UpdateAccountSpec.forAccount(accountsTableName, updatedAccount).transactItem(),
// 1: remote the username from the constraint table
buildDelete(usernamesConstraintTableName, UsernameTable.KEY_USERNAME_HASH, usernameHash)))
.build())
final Instant now = clock.instant();
final Optional<byte[]> holdToRemove = addToHolds(updatedAccount, usernameHash, now);
final List<TransactWriteItem> items = new ArrayList<>();
// 0: remove the username from the account object, conditioned on account version
items.add(UpdateAccountSpec.forAccount(accountsTableName, updatedAccount).transactItem());
// 1: Un-confirm our username, adding a temporary hold for the old username to stop others from claiming it
items.add(holdUsernameTransactItem(updatedAccount.getUuid(), usernameHash, now));
// 2?: Adding that hold may have caused our account to exceed our maximum holds. Release an old hold
holdToRemove.ifPresent(oldHold -> items.add(releaseHoldIfAllowedTransactItem(updatedAccount.getUuid(), oldHold, now)));
return asyncClient.transactWriteItems(TransactWriteItemsRequest.builder().transactItems(items).build())
.thenAccept(ignored -> {
account.setUsernameHash(null);
account.setUsernameLinkDetails(null, null);
account.setVersion(account.getVersion() + 1);
account.setUsernameHolds(updatedAccount.getUsernameHolds());
})
.exceptionally(ExceptionUtils.exceptionallyHandler(TransactionCanceledException.class, e -> {
if (conditionalCheckFailed(e.cancellationReasons().get(0)) ||
e.cancellationReasons().stream().anyMatch(Accounts::isTransactionConflict)) {
if (conditionalCheckFailed(e.cancellationReasons().get(0)) // Account version conflict
// When we looked at the holds on our account, we thought we still held the corresponding username
// reservation. But it turned out that someone else has taken the reservation since. This means that the
// TTL on the hold must have just expired, so if we retry we should see that our hold is expired, and we
// won't try to remove it again.
|| (e.cancellationReasons().size() > 2 && conditionalCheckFailed(e.cancellationReasons().get(2)))
// concurrent update on any table
|| e.cancellationReasons().stream().anyMatch(Accounts::isTransactionConflict)) {
throw new ContestedOptimisticLockException();
} else {
throw ExceptionUtils.wrap(e);

View File

@@ -0,0 +1,114 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.workers;
import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import net.sourceforge.argparse4j.inf.Subparser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.util.Util;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
public class RemoveExpiredUsernameHoldsCommand extends AbstractSinglePassCrawlAccountsCommand {
private final Clock clock;
@VisibleForTesting
static final String DRY_RUN_ARGUMENT = "dry-run";
@VisibleForTesting
static final String MAX_CONCURRENCY_ARGUMENT = "max-concurrency";
private static final int DEFAULT_MAX_CONCURRENCY = 16;
private static final String DELETED_HOLDS_COUNTER_NAME =
name(RemoveExpiredUsernameHoldsCommand.class, "expiredHolds");
private static final String UPDATED_ACCOUNTS_COUNTER_NAME =
name(RemoveExpiredUsernameHoldsCommand.class, "accountsWithExpiredHolds");
private static final Logger log = LoggerFactory.getLogger(RemoveExpiredUsernameHoldsCommand.class);
public RemoveExpiredUsernameHoldsCommand(final Clock clock) {
super("remove-expired-username-holds", "Removes expired username holds from account records");
this.clock = clock;
}
@Override
public void configure(final Subparser subparser) {
super.configure(subparser);
subparser.addArgument("--dry-run")
.type(Boolean.class)
.dest(DRY_RUN_ARGUMENT)
.required(false)
.setDefault(true)
.help("If true, don't actually delete holds");
subparser.addArgument("--max-concurrency")
.type(Integer.class)
.dest(MAX_CONCURRENCY_ARGUMENT)
.setDefault(DEFAULT_MAX_CONCURRENCY)
.help("Max concurrency for DynamoDB operations");
}
@Override
protected void crawlAccounts(final Flux<Account> accounts) {
final boolean isDryRun = getNamespace().getBoolean(DRY_RUN_ARGUMENT);
final int maxConcurrency = getNamespace().getInt(MAX_CONCURRENCY_ARGUMENT);
final Counter deletedHoldsCounter =
Metrics.counter(DELETED_HOLDS_COUNTER_NAME, "dryRun", String.valueOf(isDryRun));
final Counter updatedAccountsCounter =
Metrics.counter(UPDATED_ACCOUNTS_COUNTER_NAME, "dryRun", String.valueOf(isDryRun));
final AccountsManager accountManager = getCommandDependencies().accountsManager();
accounts.flatMap(account -> {
final List<Account.UsernameHold> holds = new ArrayList<>(account.getUsernameHolds());
int holdsToRemove = removeExpired(holds);
final Mono<Void> purgeMono = isDryRun || holdsToRemove == 0
? Mono.empty()
: Mono.fromFuture(() ->
accountManager.updateAsync(account, a -> a.setUsernameHolds(holds)).thenRun(Util.NOOP));
return purgeMono
.doOnSuccess(ignored -> {
deletedHoldsCounter.increment(holdsToRemove);
updatedAccountsCounter.increment();
})
.onErrorResume(throwable -> {
log.warn("Failed to purge {} expired holds on account {}", holdsToRemove, account.getUuid());
return Mono.empty();
});
}, maxConcurrency)
.then().block();
}
@VisibleForTesting
int removeExpired(final List<Account.UsernameHold> holds) {
final Instant now = this.clock.instant();
int holdsToRemove = 0;
for (Iterator<Account.UsernameHold> it = holds.iterator(); it.hasNext(); ) {
if (it.next().expirationSecs() < now.getEpochSecond()) {
holdsToRemove++;
it.remove();
}
}
return holdsToRemove;
}
}