Remove AccountCrawler (and doPeriodicWork) from WhisperServerService

This commit is contained in:
Chris Eager
2023-06-23 12:46:48 -05:00
committed by Chris Eager
parent f26bc70b59
commit f8fefe2e5e
11 changed files with 30 additions and 395 deletions

View File

@@ -155,10 +155,6 @@ import org.whispersystems.textsecuregcm.spam.RateLimitChallengeListener;
import org.whispersystems.textsecuregcm.spam.ReportSpamTokenProvider;
import org.whispersystems.textsecuregcm.spam.ScoreThresholdProvider;
import org.whispersystems.textsecuregcm.spam.SpamFilter;
import org.whispersystems.textsecuregcm.storage.AccountCleaner;
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawler;
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerCache;
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerListener;
import org.whispersystems.textsecuregcm.storage.AccountLockManager;
import org.whispersystems.textsecuregcm.storage.Accounts;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
@@ -171,12 +167,10 @@ import org.whispersystems.textsecuregcm.storage.MessagePersister;
import org.whispersystems.textsecuregcm.storage.MessagesCache;
import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.NonNormalizedAccountCrawlerListener;
import org.whispersystems.textsecuregcm.storage.PhoneNumberIdentifiers;
import org.whispersystems.textsecuregcm.storage.Profiles;
import org.whispersystems.textsecuregcm.storage.ProfilesManager;
import org.whispersystems.textsecuregcm.storage.PushChallengeDynamoDb;
import org.whispersystems.textsecuregcm.storage.PushFeedbackProcessor;
import org.whispersystems.textsecuregcm.storage.RedeemedReceiptsManager;
import org.whispersystems.textsecuregcm.storage.RegistrationRecoveryPasswords;
import org.whispersystems.textsecuregcm.storage.RegistrationRecoveryPasswordsManager;
@@ -386,10 +380,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
.executorService(name(getClass(), "secureValueRecoveryService-%d")).maxThreads(1).minThreads(1).build();
ExecutorService storageServiceExecutor = environment.lifecycle()
.executorService(name(getClass(), "storageService-%d")).maxThreads(1).minThreads(1).build();
ExecutorService accountDeletionExecutor = environment.lifecycle()
.executorService(name(getClass(), "accountCleaner-%d")).maxThreads(16).minThreads(16).build();
ExecutorService pushFeedbackUpdateExecutor = environment.lifecycle()
.executorService(name(getClass(), "pushFeedback-%d")).maxThreads(4).minThreads(4).build();
Scheduler messageDeliveryScheduler = Schedulers.fromExecutorService(
ExecutorServiceMetrics.monitor(Metrics.globalRegistry,
@@ -562,31 +552,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
Optional.empty());
ChangeNumberManager changeNumberManager = new ChangeNumberManager(messageSender, accountsManager);
AccountDatabaseCrawlerCache accountCleanerAccountDatabaseCrawlerCache =
new AccountDatabaseCrawlerCache(cacheCluster, AccountDatabaseCrawlerCache.ACCOUNT_CLEANER_PREFIX);
AccountDatabaseCrawler accountCleanerAccountDatabaseCrawler = new AccountDatabaseCrawler("Account cleaner crawler",
accountsManager,
accountCleanerAccountDatabaseCrawlerCache,
List.of(new AccountCleaner(accountsManager, accountDeletionExecutor)),
config.getAccountDatabaseCrawlerConfiguration().getChunkSize(),
dynamicConfigurationManager
);
// TODO listeners must be ordered so that ones that directly update accounts come last, so that read-only ones are not working with stale data
final List<AccountDatabaseCrawlerListener> accountDatabaseCrawlerListeners = List.of(
new NonNormalizedAccountCrawlerListener(accountsManager, metricsCluster),
// PushFeedbackProcessor may update device properties
new PushFeedbackProcessor(accountsManager, pushFeedbackUpdateExecutor));
AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(cacheCluster,
AccountDatabaseCrawlerCache.GENERAL_PURPOSE_PREFIX);
AccountDatabaseCrawler accountDatabaseCrawler = new AccountDatabaseCrawler("General-purpose account crawler",
accountsManager,
accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners,
config.getAccountDatabaseCrawlerConfiguration().getChunkSize(),
dynamicConfigurationManager
);
HttpClient currencyClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_2).connectTimeout(Duration.ofSeconds(10)).build();
FixerClient fixerClient = new FixerClient(currencyClient, config.getPaymentsServiceConfiguration().fixerApiKey().value());
CoinMarketCapClient coinMarketCapClient = new CoinMarketCapClient(currencyClient, config.getPaymentsServiceConfiguration().coinMarketCapApiKey().value(), config.getPaymentsServiceConfiguration().coinMarketCapCurrencyIds());
@@ -596,8 +561,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
environment.lifecycle().manage(apnSender);
environment.lifecycle().manage(apnPushNotificationScheduler);
environment.lifecycle().manage(provisioningManager);
environment.lifecycle().manage(accountDatabaseCrawler);
environment.lifecycle().manage(accountCleanerAccountDatabaseCrawler);
environment.lifecycle().manage(messagesCache);
environment.lifecycle().manage(messagePersister);
environment.lifecycle().manage(clientPresenceManager);

View File

@@ -1,10 +0,0 @@
/*
* Copyright 2023 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.configuration.dynamic;
public record DynamicAccountDatabaseCrawlerConfiguration(boolean periodicWorkEnabled, boolean crawlAllEnabled) {
}

View File

@@ -56,16 +56,10 @@ public class DynamicConfiguration {
@Valid
DynamicMessagePersisterConfiguration messagePersister = new DynamicMessagePersisterConfiguration();
@JsonProperty
@Valid
DynamicRateLimitPolicy rateLimitPolicy = new DynamicRateLimitPolicy(false);
@JsonProperty
@Valid
DynamicAccountDatabaseCrawlerConfiguration accountDatabaseCrawler = new DynamicAccountDatabaseCrawlerConfiguration(
true, false);
public Optional<DynamicExperimentEnrollmentConfiguration> getExperimentEnrollmentConfiguration(
final String experimentName) {
return Optional.ofNullable(experiments.get(experimentName));
@@ -112,7 +106,4 @@ public class DynamicConfiguration {
return rateLimitPolicy;
}
public DynamicAccountDatabaseCrawlerConfiguration getAccountDatabaseCrawlerConfiguration() {
return accountDatabaseCrawler;
}
}

View File

@@ -9,21 +9,15 @@ import static com.codahale.metrics.MetricRegistry.name;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import io.dropwizard.lifecycle.Managed;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Util;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public class AccountDatabaseCrawler implements Managed, Runnable {
public class AccountDatabaseCrawler {
private static final Logger logger = LoggerFactory.getLogger(AccountDatabaseCrawler.class);
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
@@ -32,7 +26,6 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
name(AccountDatabaseCrawler.class, "processChunk"));
private static final long WORKER_TTL_MS = 120_000L;
private static final long CHUNK_INTERVAL_MILLIS = Duration.ofSeconds(2).toMillis();
private final String name;
private final AccountsManager accounts;
@@ -41,58 +34,17 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
private final AccountDatabaseCrawlerCache cache;
private final List<AccountDatabaseCrawlerListener> listeners;
private final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager;
private AtomicBoolean running = new AtomicBoolean(false);
private boolean finished;
public AccountDatabaseCrawler(final String name,
AccountsManager accounts,
AccountDatabaseCrawlerCache cache,
List<AccountDatabaseCrawlerListener> listeners,
int chunkSize,
DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager) {
int chunkSize) {
this.name = name;
this.accounts = accounts;
this.chunkSize = chunkSize;
this.workerId = UUID.randomUUID().toString();
this.cache = cache;
this.listeners = listeners;
this.dynamicConfigurationManager = dynamicConfigurationManager;
}
@Override
public synchronized void start() {
running.set(true);
new Thread(this).start();
}
@Override
public synchronized void stop() {
running.set(false);
notifyAll();
while (!finished) {
Util.wait(this);
}
}
@Override
public void run() {
while (running.get()) {
try {
doPeriodicWork();
sleepWhileRunning(CHUNK_INTERVAL_MILLIS);
} catch (Throwable t) {
logger.warn("{}: error in database crawl: {}: {}", name, t.getClass().getSimpleName(), t.getMessage(), t);
Util.sleep(10000);
}
}
synchronized (this) {
finished = true;
notifyAll();
}
}
public void crawlAllAccounts() {
@@ -101,7 +53,7 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
return;
}
try {
Optional<UUID> fromUuid = cache.getLastUuid();
Optional<UUID> fromUuid = getLastUuid();
if (fromUuid.isEmpty()) {
logger.info("{}: Started crawl", name);
@@ -110,83 +62,26 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
logger.info("{}: Resuming crawl", name);
}
try {
AccountCrawlChunk chunkAccounts;
do {
if (!dynamicConfigurationManager.getConfiguration().getAccountDatabaseCrawlerConfiguration()
.crawlAllEnabled()) {
logger.warn("Exiting crawl - not enabled by dynamic configuration");
return;
}
try (Timer.Context timer = processChunkTimer.time()) {
logger.debug("{}: Processing chunk", name);
chunkAccounts = readChunk(fromUuid, chunkSize);
AccountCrawlChunk chunkAccounts;
do {
try (Timer.Context timer = processChunkTimer.time()) {
logger.debug("{}: Processing chunk", name);
chunkAccounts = readChunk(fromUuid, chunkSize);
for (AccountDatabaseCrawlerListener listener : listeners) {
listener.timeAndProcessCrawlChunk(fromUuid, chunkAccounts.getAccounts());
}
fromUuid = chunkAccounts.getLastUuid();
cacheLastUuid(fromUuid);
}
} while (!chunkAccounts.getAccounts().isEmpty());
logger.info("{}: Finished crawl", name);
listeners.forEach(AccountDatabaseCrawlerListener::onCrawlEnd);
} catch (AccountDatabaseCrawlerRestartException e) {
logger.warn("Crawl stopped", e);
}
} finally {
cache.releaseActiveWork(workerId);
}
}
@VisibleForTesting
public void doPeriodicWork() {
if (!dynamicConfigurationManager.getConfiguration().getAccountDatabaseCrawlerConfiguration()
.periodicWorkEnabled()) {
return;
}
if (cache.claimActiveWork(workerId, WORKER_TTL_MS)) {
try {
processChunk();
} finally {
cache.releaseActiveWork(workerId);
}
}
}
private void processChunk() {
try (Timer.Context timer = processChunkTimer.time()) {
final Optional<UUID> fromUuid = getLastUuid();
if (fromUuid.isEmpty()) {
logger.info("{}: Started crawl", name);
listeners.forEach(AccountDatabaseCrawlerListener::onCrawlStart);
}
final AccountCrawlChunk chunkAccounts = readChunk(fromUuid, chunkSize);
if (chunkAccounts.getAccounts().isEmpty()) {
logger.info("{}: Finished crawl", name);
listeners.forEach(AccountDatabaseCrawlerListener::onCrawlEnd);
cacheLastUuid(Optional.empty());
} else {
logger.debug("{}: Processing chunk", name);
try {
for (AccountDatabaseCrawlerListener listener : listeners) {
listener.timeAndProcessCrawlChunk(fromUuid, chunkAccounts.getAccounts());
}
cacheLastUuid(chunkAccounts.getLastUuid());
} catch (AccountDatabaseCrawlerRestartException e) {
cacheLastUuid(Optional.empty());
fromUuid = chunkAccounts.getLastUuid();
cacheLastUuid(fromUuid);
}
}
} while (!chunkAccounts.getAccounts().isEmpty());
logger.info("{}: Finished crawl", name);
listeners.forEach(AccountDatabaseCrawlerListener::onCrawlEnd);
} finally {
cache.releaseActiveWork(workerId);
}
}
@@ -213,10 +108,4 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
cache.setLastUuid(lastUuid);
}
private synchronized void sleepWhileRunning(long delayMs) {
if (running.get()) {
Util.wait(this, delayMs);
}
}
}

View File

@@ -16,19 +16,19 @@ import org.whispersystems.textsecuregcm.util.Constants;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public abstract class AccountDatabaseCrawlerListener {
private Timer processChunkTimer;
private final Timer processChunkTimer;
abstract public void onCrawlStart();
abstract public void onCrawlEnd();
abstract protected void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) throws AccountDatabaseCrawlerRestartException;
abstract protected void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts);
public AccountDatabaseCrawlerListener() {
processChunkTimer = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME).timer(name(AccountDatabaseCrawlerListener.class, "processChunk", getClass().getSimpleName()));
}
public void timeAndProcessCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) throws AccountDatabaseCrawlerRestartException {
public void timeAndProcessCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) {
try (Timer.Context timer = processChunkTimer.time()) {
onCrawlChunk(fromUuid, chunkAccounts);
}

View File

@@ -1,15 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
public class AccountDatabaseCrawlerRestartException extends Exception {
public AccountDatabaseCrawlerRestartException(String s) {
super(s);
}
public AccountDatabaseCrawlerRestartException(Exception e) {
super(e);
}
}

View File

@@ -123,8 +123,7 @@ public class CrawlAccountsCommand extends EnvironmentCommand<WhisperServerConfig
yield new AccountDatabaseCrawler("General-purpose account crawler",
accountsManager,
accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners,
configuration.getAccountDatabaseCrawlerConfiguration().getChunkSize(),
dynamicConfigurationManager
configuration.getAccountDatabaseCrawlerConfiguration().getChunkSize()
);
}
case ACCOUNT_CLEANER -> {
@@ -138,8 +137,7 @@ public class CrawlAccountsCommand extends EnvironmentCommand<WhisperServerConfig
accountsManager,
accountDatabaseCrawlerCache,
List.of(new AccountCleaner(accountsManager, accountDeletionExecutor)),
configuration.getAccountDatabaseCrawlerConfiguration().getChunkSize(),
dynamicConfigurationManager
configuration.getAccountDatabaseCrawlerConfiguration().getChunkSize()
);
}
};