Use one thread per account in the "encrypt timestamps" crawler

This commit is contained in:
Katherine
2025-07-29 09:51:36 -04:00
committed by GitHub
parent 8aa408a3c1
commit 5f5c345f94
2 changed files with 50 additions and 65 deletions

View File

@@ -12,10 +12,12 @@ import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import java.time.Duration;
import net.sourceforge.argparse4j.inf.Subparser;
import org.signal.libsignal.protocol.IdentityKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.identity.IdentityType;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.util.EncryptDeviceCreationTimestampUtil;
import org.whispersystems.textsecuregcm.util.Util;
import reactor.core.publisher.Mono;
@@ -28,8 +30,6 @@ public class EncryptDeviceCreationTimestampCommand extends AbstractSinglePassCra
private static final int MAX_CONCURRENCY = 16;
private static final String ENCRYPTED_CREATION_TIMESTAMP_COUNTER_NAME =
name(EncryptDeviceCreationTimestampCommand.class, "encryptedCreationTimestamp");
private static final String PROCESSED_ACCOUNT_COUNTER_NAME =
name(EncryptDeviceCreationTimestampCommand.class, "processedAccount");
@@ -54,35 +54,33 @@ public class EncryptDeviceCreationTimestampCommand extends AbstractSinglePassCra
@Override
protected void crawlAccounts(final Flux<Account> accounts) {
final boolean isDryRun = getNamespace().getBoolean(DRY_RUN_ARGUMENT);
final Counter encryptedTimestampCounter =
Metrics.counter(ENCRYPTED_CREATION_TIMESTAMP_COUNTER_NAME, "dryRun", String.valueOf(isDryRun));
final Counter processedAccountCounter =
Metrics.counter(PROCESSED_ACCOUNT_COUNTER_NAME, "dryRun", String.valueOf(isDryRun));
accounts
.flatMap(account ->
Flux.fromIterable(account.getDevices())
.flatMap(device -> {
final byte[] createdAtCiphertext = EncryptDeviceCreationTimestampUtil.encrypt(
device.getCreated(), account.getIdentityKey(IdentityType.ACI),
device.getId(), device.getRegistrationId(IdentityType.ACI));
final Mono<Void> encryptTimestampMono = isDryRun
? Mono.empty()
: Mono.fromFuture(() -> getCommandDependencies().accountsManager().updateDeviceAsync(
account, device.getId(), d -> d.setCreatedAtCiphertext(createdAtCiphertext))
.thenRun(Util.NOOP));
return encryptTimestampMono
.doOnSuccess(_ -> encryptedTimestampCounter.increment())
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(4)))
.onErrorResume(throwable -> {
log.warn("Failed to encrypt creation timestamp on device {}, account {}", device.getId(), account.getUuid(), throwable);
return Mono.empty();
});
}, MAX_CONCURRENCY)
.then()
.doOnSuccess(_ -> processedAccountCounter.increment()),
MAX_CONCURRENCY)
.flatMap(account -> {
Mono<Void> encryptTimestampMono = isDryRun
? Mono.empty()
: Mono.fromFuture(
() -> getCommandDependencies().accountsManager().updateAsync(account, a -> {
final IdentityKey aciIdentityKey = account.getIdentityKey(IdentityType.ACI);
for (final Device device : a.getDevices()) {
final byte[] createdAtCiphertext = EncryptDeviceCreationTimestampUtil.encrypt(
device.getCreated(), aciIdentityKey,
device.getId(), device.getRegistrationId(IdentityType.ACI));
device.setCreatedAtCiphertext(createdAtCiphertext);
}
}).thenRun(Util.NOOP));
return encryptTimestampMono
.doOnSuccess(_ -> processedAccountCounter.increment())
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(4)))
.onErrorResume(throwable -> {
log.warn("Failed to encrypt creation timestamps on account {}", account.getUuid(), throwable);
return Mono.empty();
});
}, MAX_CONCURRENCY)
.then()
.block();
log.info("Finished encrypting device timestamps");
}
}