Always expose sequential fluxes to account crawlers

This commit is contained in:
Jon Chambers
2023-12-08 13:32:53 -05:00
committed by Jon Chambers
parent cca747a1f6
commit 5b0fcbe854
9 changed files with 15 additions and 20 deletions

View File

@@ -45,7 +45,6 @@ import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.textsecuregcm.util.UUIDUtil;
import org.whispersystems.textsecuregcm.util.Util;
import reactor.core.publisher.Flux;
import reactor.core.publisher.ParallelFlux;
import reactor.core.scheduler.Scheduler;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
@@ -1058,7 +1057,7 @@ public class Accounts extends AbstractDynamoDbStore {
.thenRun(() -> sample.stop(DELETE_TIMER));
}
ParallelFlux<Account> getAll(final int segments, final Scheduler scheduler) {
Flux<Account> getAll(final int segments, final Scheduler scheduler) {
if (segments < 1) {
throw new IllegalArgumentException("Total number of segments must be positive");
}
@@ -1073,7 +1072,8 @@ public class Accounts extends AbstractDynamoDbStore {
.totalSegments(segments)
.build())
.items()
.map(Accounts::fromItem));
.map(Accounts::fromItem))
.sequential();
}
@Nonnull

View File

@@ -69,7 +69,7 @@ import org.whispersystems.textsecuregcm.util.ExceptionUtils;
import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.textsecuregcm.util.Util;
import reactor.core.publisher.ParallelFlux;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
@@ -940,7 +940,7 @@ public class AccountsManager {
return accounts.findRecentlyDeletedE164(uuid);
}
public ParallelFlux<Account> streamAllFromDynamo(final int segments, final Scheduler scheduler) {
public Flux<Account> streamAllFromDynamo(final int segments, final Scheduler scheduler) {
return accounts.getAll(segments, scheduler);
}

View File

@@ -18,7 +18,7 @@ import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.util.logging.UncaughtExceptionHandler;
import reactor.core.publisher.ParallelFlux;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public abstract class AbstractSinglePassCrawlAccountsCommand extends EnvironmentCommand<WhisperServerConfiguration> {
@@ -103,5 +103,5 @@ public abstract class AbstractSinglePassCrawlAccountsCommand extends Environment
logger.error("Unhandled error", throwable);
}
protected abstract void crawlAccounts(final ParallelFlux<Account> accounts);
protected abstract void crawlAccounts(final Flux<Account> accounts);
}

View File

@@ -22,7 +22,6 @@ import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.KeysManager;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ParallelFlux;
import reactor.util.function.Tuple3;
import reactor.util.function.Tuples;
import reactor.util.retry.Retry;
@@ -63,13 +62,12 @@ public class MigrateSignedECPreKeysCommand extends AbstractSinglePassCrawlAccoun
}
@Override
protected void crawlAccounts(final ParallelFlux<Account> accounts) {
protected void crawlAccounts(final Flux<Account> accounts) {
final KeysManager keysManager = getCommandDependencies().keysManager();
final int maxConcurrency = getNamespace().getInt(MAX_CONCURRENCY_ARGUMENT);
final int bufferSize = getNamespace().getInt(BUFFER_ARGUMENT);
accounts
.sequential()
.flatMap(account -> Flux.fromIterable(account.getDevices())
.flatMap(device -> Flux.fromArray(IdentityType.values())
.filter(identityType -> device.getSignedPreKey(identityType) != null)

View File

@@ -20,8 +20,8 @@ import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Device;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ParallelFlux;
public class ProcessPushNotificationFeedbackCommand extends AbstractSinglePassCrawlAccountsCommand {
@@ -62,12 +62,11 @@ public class ProcessPushNotificationFeedbackCommand extends AbstractSinglePassCr
}
@Override
protected void crawlAccounts(final ParallelFlux<Account> accounts) {
protected void crawlAccounts(final Flux<Account> accounts) {
final boolean isDryRun = getNamespace().getBoolean(DRY_RUN_ARGUMENT);
accounts
.filter(account -> account.getDevices().stream().anyMatch(this::deviceNeedsUpdate))
.sequential()
.flatMap(account -> {
account.getDevices().stream()
.filter(this::deviceNeedsUpdate)

View File

@@ -19,7 +19,7 @@ import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ParallelFlux;
import reactor.core.publisher.Flux;
public class RemoveExpiredAccountsCommand extends AbstractSinglePassCrawlAccountsCommand {
@@ -57,13 +57,12 @@ public class RemoveExpiredAccountsCommand extends AbstractSinglePassCrawlAccount
}
@Override
protected void crawlAccounts(final ParallelFlux<Account> accounts) {
protected void crawlAccounts(final Flux<Account> accounts) {
final boolean isDryRun = getNamespace().getBoolean(DRY_RUN_ARGUMENT);
final Counter deletedAccountCounter =
Metrics.counter(DELETED_ACCOUNT_COUNTER_NAME, "dryRun", String.valueOf(isDryRun));
accounts.filter(this::isExpired)
.sequential()
.flatMap(expiredAccount -> {
final Mono<Void> deleteAccountMono = isDryRun
? Mono.empty()