mirror of
https://github.com/signalapp/Signal-Server
synced 2026-04-21 09:48:05 +01:00
Discard old chunk-based account crawler machinery
This commit is contained in:
committed by
Jon Chambers
parent
9d47a6f41f
commit
744eb58071
@@ -16,7 +16,6 @@ import java.util.Set;
|
||||
import javax.validation.Valid;
|
||||
import javax.validation.constraints.NotNull;
|
||||
import org.whispersystems.textsecuregcm.attachments.TusConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.AccountDatabaseCrawlerConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.AdminEventLoggingConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.ApnConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.AppConfigConfiguration;
|
||||
@@ -132,11 +131,6 @@ public class WhisperServerConfiguration extends Configuration {
|
||||
@JsonProperty
|
||||
private SecureValueRecovery2Configuration svr2;
|
||||
|
||||
@NotNull
|
||||
@Valid
|
||||
@JsonProperty
|
||||
private AccountDatabaseCrawlerConfiguration accountDatabaseCrawler;
|
||||
|
||||
@NotNull
|
||||
@Valid
|
||||
@JsonProperty
|
||||
@@ -374,10 +368,6 @@ public class WhisperServerConfiguration extends Configuration {
|
||||
return storageService;
|
||||
}
|
||||
|
||||
public AccountDatabaseCrawlerConfiguration getAccountDatabaseCrawlerConfiguration() {
|
||||
return accountDatabaseCrawler;
|
||||
}
|
||||
|
||||
public MessageCacheConfiguration getMessageCacheConfiguration() {
|
||||
return messageCache;
|
||||
}
|
||||
|
||||
@@ -212,7 +212,6 @@ import org.whispersystems.textsecuregcm.websocket.WebSocketAccountAuthenticator;
|
||||
import org.whispersystems.textsecuregcm.workers.AssignUsernameCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.CertificateCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.CheckDynamicConfigurationCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.CrawlAccountsCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.DeleteUserCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.MessagePersisterServiceCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.MigrateSignedECPreKeysCommand;
|
||||
@@ -270,7 +269,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||
bootstrap.addCommand(new SetUserDiscoverabilityCommand());
|
||||
bootstrap.addCommand(new AssignUsernameCommand());
|
||||
bootstrap.addCommand(new UnlinkDeviceCommand());
|
||||
bootstrap.addCommand(new CrawlAccountsCommand());
|
||||
bootstrap.addCommand(new ScheduledApnPushNotificationSenderServiceCommand());
|
||||
bootstrap.addCommand(new MessagePersisterServiceCommand());
|
||||
bootstrap.addCommand(new MigrateSignedECPreKeysCommand());
|
||||
@@ -329,8 +327,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||
config.getDynamoDbTables().getAccounts().getPhoneNumberTableName(),
|
||||
config.getDynamoDbTables().getAccounts().getPhoneNumberIdentifierTableName(),
|
||||
config.getDynamoDbTables().getAccounts().getUsernamesTableName(),
|
||||
config.getDynamoDbTables().getDeletedAccounts().getTableName(),
|
||||
config.getDynamoDbTables().getAccounts().getScanPageSize());
|
||||
config.getDynamoDbTables().getDeletedAccounts().getTableName());
|
||||
ClientReleases clientReleases = new ClientReleases(dynamoDbAsyncClient,
|
||||
config.getDynamoDbTables().getClientReleases().getTableName());
|
||||
PhoneNumberIdentifiers phoneNumberIdentifiers = new PhoneNumberIdentifiers(dynamoDbClient,
|
||||
|
||||
@@ -1,18 +0,0 @@
|
||||
/*
|
||||
* Copyright 2013-2020 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
package org.whispersystems.textsecuregcm.configuration;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
public class AccountDatabaseCrawlerConfiguration {
|
||||
|
||||
@JsonProperty
|
||||
private int chunkSize = 1000;
|
||||
|
||||
public int getChunkSize() {
|
||||
return chunkSize;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -10,22 +10,19 @@ public class AccountsTableConfiguration extends Table {
|
||||
private final String phoneNumberTableName;
|
||||
private final String phoneNumberIdentifierTableName;
|
||||
private final String usernamesTableName;
|
||||
private final int scanPageSize;
|
||||
|
||||
@JsonCreator
|
||||
public AccountsTableConfiguration(
|
||||
@JsonProperty("tableName") final String tableName,
|
||||
@JsonProperty("phoneNumberTableName") final String phoneNumberTableName,
|
||||
@JsonProperty("phoneNumberIdentifierTableName") final String phoneNumberIdentifierTableName,
|
||||
@JsonProperty("usernamesTableName") final String usernamesTableName,
|
||||
@JsonProperty("scanPageSize") final int scanPageSize) {
|
||||
@JsonProperty("usernamesTableName") final String usernamesTableName) {
|
||||
|
||||
super(tableName);
|
||||
|
||||
this.phoneNumberTableName = phoneNumberTableName;
|
||||
this.phoneNumberIdentifierTableName = phoneNumberIdentifierTableName;
|
||||
this.usernamesTableName = usernamesTableName;
|
||||
this.scanPageSize = scanPageSize;
|
||||
}
|
||||
|
||||
@NotBlank
|
||||
@@ -42,8 +39,4 @@ public class AccountsTableConfiguration extends Table {
|
||||
public String getUsernamesTableName() {
|
||||
return usernamesTableName;
|
||||
}
|
||||
|
||||
public int getScanPageSize() {
|
||||
return scanPageSize;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,30 +0,0 @@
|
||||
/*
|
||||
* Copyright 2013-2021 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
|
||||
public class AccountCrawlChunk {
|
||||
|
||||
private final List<Account> accounts;
|
||||
@Nullable
|
||||
private final UUID lastUuid;
|
||||
|
||||
public AccountCrawlChunk(final List<Account> accounts, @Nullable final UUID lastUuid) {
|
||||
this.accounts = accounts;
|
||||
this.lastUuid = lastUuid;
|
||||
}
|
||||
|
||||
public List<Account> getAccounts() {
|
||||
return accounts;
|
||||
}
|
||||
|
||||
public Optional<UUID> getLastUuid() {
|
||||
return Optional.ofNullable(lastUuid);
|
||||
}
|
||||
}
|
||||
@@ -1,111 +0,0 @@
|
||||
/*
|
||||
* Copyright 2013-2020 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.codahale.metrics.Timer;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.util.Constants;
|
||||
|
||||
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
|
||||
public class AccountDatabaseCrawler {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(AccountDatabaseCrawler.class);
|
||||
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
private static final Timer readChunkTimer = metricRegistry.timer(name(AccountDatabaseCrawler.class, "readChunk"));
|
||||
private static final Timer processChunkTimer = metricRegistry.timer(
|
||||
name(AccountDatabaseCrawler.class, "processChunk"));
|
||||
|
||||
private static final long WORKER_TTL_MS = 120_000L;
|
||||
|
||||
private final String name;
|
||||
private final AccountsManager accounts;
|
||||
private final int chunkSize;
|
||||
private final String workerId;
|
||||
private final AccountDatabaseCrawlerCache cache;
|
||||
private final List<AccountDatabaseCrawlerListener> listeners;
|
||||
|
||||
public AccountDatabaseCrawler(final String name,
|
||||
AccountsManager accounts,
|
||||
AccountDatabaseCrawlerCache cache,
|
||||
List<AccountDatabaseCrawlerListener> listeners,
|
||||
int chunkSize) {
|
||||
this.name = name;
|
||||
this.accounts = accounts;
|
||||
this.chunkSize = chunkSize;
|
||||
this.workerId = UUID.randomUUID().toString();
|
||||
this.cache = cache;
|
||||
this.listeners = listeners;
|
||||
}
|
||||
|
||||
public void crawlAllAccounts() {
|
||||
if (!cache.claimActiveWork(workerId, WORKER_TTL_MS)) {
|
||||
logger.info("Did not claim active work");
|
||||
return;
|
||||
}
|
||||
try {
|
||||
Optional<UUID> fromUuid = getLastUuid();
|
||||
|
||||
if (fromUuid.isEmpty()) {
|
||||
logger.info("{}: Started crawl", name);
|
||||
listeners.forEach(AccountDatabaseCrawlerListener::onCrawlStart);
|
||||
} else {
|
||||
logger.info("{}: Resuming crawl", name);
|
||||
}
|
||||
|
||||
AccountCrawlChunk chunkAccounts;
|
||||
do {
|
||||
try (Timer.Context timer = processChunkTimer.time()) {
|
||||
logger.debug("{}: Processing chunk", name);
|
||||
chunkAccounts = readChunk(fromUuid, chunkSize);
|
||||
|
||||
for (AccountDatabaseCrawlerListener listener : listeners) {
|
||||
listener.timeAndProcessCrawlChunk(fromUuid, chunkAccounts.getAccounts());
|
||||
}
|
||||
fromUuid = chunkAccounts.getLastUuid();
|
||||
cacheLastUuid(fromUuid);
|
||||
}
|
||||
|
||||
} while (!chunkAccounts.getAccounts().isEmpty());
|
||||
|
||||
logger.info("{}: Finished crawl", name);
|
||||
listeners.forEach(AccountDatabaseCrawlerListener::onCrawlEnd);
|
||||
|
||||
} finally {
|
||||
cache.releaseActiveWork(workerId);
|
||||
}
|
||||
}
|
||||
|
||||
private AccountCrawlChunk readChunk(Optional<UUID> fromUuid, int chunkSize) {
|
||||
return readChunk(fromUuid, chunkSize, readChunkTimer);
|
||||
}
|
||||
|
||||
private AccountCrawlChunk readChunk(Optional<UUID> fromUuid, int chunkSize, Timer readTimer) {
|
||||
try (Timer.Context timer = readTimer.time()) {
|
||||
|
||||
if (fromUuid.isPresent()) {
|
||||
return accounts.getAllFromDynamo(fromUuid.get(), chunkSize);
|
||||
}
|
||||
|
||||
return accounts.getAllFromDynamo(chunkSize);
|
||||
}
|
||||
}
|
||||
|
||||
private Optional<UUID> getLastUuid() {
|
||||
return cache.getLastUuid();
|
||||
}
|
||||
|
||||
private void cacheLastUuid(final Optional<UUID> lastUuid) {
|
||||
cache.setLastUuid(lastUuid);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,74 +0,0 @@
|
||||
/*
|
||||
* Copyright 2013-2021 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import io.lettuce.core.ScriptOutputType;
|
||||
import io.lettuce.core.SetArgs;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
|
||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
|
||||
|
||||
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
|
||||
public class AccountDatabaseCrawlerCache {
|
||||
|
||||
public static final String GENERAL_PURPOSE_PREFIX = "";
|
||||
public static final String ACCOUNT_CLEANER_PREFIX = "account-cleaner";
|
||||
|
||||
private static final String ACTIVE_WORKER_KEY = "account_database_crawler_cache_active_worker";
|
||||
private static final String LAST_UUID_DYNAMO_KEY = "account_database_crawler_cache_last_uuid_dynamo";
|
||||
|
||||
private static final long LAST_NUMBER_TTL_MS = 86400_000L;
|
||||
|
||||
private final FaultTolerantRedisCluster cacheCluster;
|
||||
private final ClusterLuaScript unlockClusterScript;
|
||||
|
||||
private final String prefix;
|
||||
|
||||
public AccountDatabaseCrawlerCache(FaultTolerantRedisCluster cacheCluster, String prefix) throws IOException {
|
||||
this.cacheCluster = cacheCluster;
|
||||
this.unlockClusterScript = ClusterLuaScript.fromResource(cacheCluster, "lua/account_database_crawler/unlock.lua",
|
||||
ScriptOutputType.INTEGER);
|
||||
|
||||
this.prefix = prefix + "::";
|
||||
}
|
||||
|
||||
public boolean claimActiveWork(String workerId, long ttlMs) {
|
||||
return "OK".equals(cacheCluster.withCluster(connection -> connection.sync()
|
||||
.set(getPrefixedKey(ACTIVE_WORKER_KEY), workerId, SetArgs.Builder.nx().px(ttlMs))));
|
||||
}
|
||||
|
||||
public void releaseActiveWork(String workerId) {
|
||||
unlockClusterScript.execute(List.of(getPrefixedKey(ACTIVE_WORKER_KEY)), List.of(workerId));
|
||||
}
|
||||
|
||||
public Optional<UUID> getLastUuid() {
|
||||
final String lastUuidString = cacheCluster.withCluster(
|
||||
connection -> connection.sync().get(getPrefixedKey(LAST_UUID_DYNAMO_KEY)));
|
||||
|
||||
if (lastUuidString == null) {
|
||||
return Optional.empty();
|
||||
} else {
|
||||
return Optional.of(UUID.fromString(lastUuidString));
|
||||
}
|
||||
}
|
||||
|
||||
public void setLastUuid(Optional<UUID> lastUuid) {
|
||||
if (lastUuid.isPresent()) {
|
||||
cacheCluster.useCluster(
|
||||
connection -> connection.sync()
|
||||
.psetex(getPrefixedKey(LAST_UUID_DYNAMO_KEY), LAST_NUMBER_TTL_MS, lastUuid.get().toString()));
|
||||
} else {
|
||||
cacheCluster.useCluster(connection -> connection.sync().del(getPrefixedKey(LAST_UUID_DYNAMO_KEY)));
|
||||
}
|
||||
}
|
||||
|
||||
private String getPrefixedKey(final String key) {
|
||||
return prefix + key;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,37 +0,0 @@
|
||||
/*
|
||||
* Copyright 2013-2020 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.codahale.metrics.Timer;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import org.whispersystems.textsecuregcm.util.Constants;
|
||||
|
||||
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
|
||||
public abstract class AccountDatabaseCrawlerListener {
|
||||
|
||||
private final Timer processChunkTimer;
|
||||
|
||||
abstract public void onCrawlStart();
|
||||
|
||||
abstract public void onCrawlEnd();
|
||||
|
||||
abstract protected void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts);
|
||||
|
||||
public AccountDatabaseCrawlerListener() {
|
||||
processChunkTimer = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME).timer(name(AccountDatabaseCrawlerListener.class, "processChunk", getClass().getSimpleName()));
|
||||
}
|
||||
|
||||
public void timeAndProcessCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) {
|
||||
try (Timer.Context timer = processChunkTimer.time()) {
|
||||
onCrawlChunk(fromUuid, chunkAccounts);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -94,8 +94,6 @@ public class Accounts extends AbstractDynamoDbStore {
|
||||
private static final Timer GET_BY_USERNAME_LINK_HANDLE_TIMER = Metrics.timer(name(Accounts.class, "getByUsernameLinkHandle"));
|
||||
private static final Timer GET_BY_PNI_TIMER = Metrics.timer(name(Accounts.class, "getByPni"));
|
||||
private static final Timer GET_BY_UUID_TIMER = Metrics.timer(name(Accounts.class, "getByUuid"));
|
||||
private static final Timer GET_ALL_FROM_START_TIMER = Metrics.timer(name(Accounts.class, "getAllFrom"));
|
||||
private static final Timer GET_ALL_FROM_OFFSET_TIMER = Metrics.timer(name(Accounts.class, "getAllFromOffset"));
|
||||
private static final Timer DELETE_TIMER = Metrics.timer(name(Accounts.class, "delete"));
|
||||
|
||||
private static final String CONDITIONAL_CHECK_FAILED = "ConditionalCheckFailed";
|
||||
@@ -144,9 +142,6 @@ public class Accounts extends AbstractDynamoDbStore {
|
||||
private final String deletedAccountsTableName;
|
||||
private final String accountsTableName;
|
||||
|
||||
private final int scanPageSize;
|
||||
|
||||
|
||||
@VisibleForTesting
|
||||
public Accounts(
|
||||
final Clock clock,
|
||||
@@ -156,8 +151,7 @@ public class Accounts extends AbstractDynamoDbStore {
|
||||
final String phoneNumberConstraintTableName,
|
||||
final String phoneNumberIdentifierConstraintTableName,
|
||||
final String usernamesConstraintTableName,
|
||||
final String deletedAccountsTableName,
|
||||
final int scanPageSize) {
|
||||
final String deletedAccountsTableName) {
|
||||
super(client);
|
||||
this.clock = clock;
|
||||
this.asyncClient = asyncClient;
|
||||
@@ -166,7 +160,6 @@ public class Accounts extends AbstractDynamoDbStore {
|
||||
this.accountsTableName = accountsTableName;
|
||||
this.usernamesConstraintTableName = usernamesConstraintTableName;
|
||||
this.deletedAccountsTableName = deletedAccountsTableName;
|
||||
this.scanPageSize = scanPageSize;
|
||||
}
|
||||
|
||||
public Accounts(
|
||||
@@ -176,11 +169,10 @@ public class Accounts extends AbstractDynamoDbStore {
|
||||
final String phoneNumberConstraintTableName,
|
||||
final String phoneNumberIdentifierConstraintTableName,
|
||||
final String usernamesConstraintTableName,
|
||||
final String deletedAccountsTableName,
|
||||
final int scanPageSize) {
|
||||
final String deletedAccountsTableName) {
|
||||
this(Clock.systemUTC(), client, asyncClient, accountsTableName,
|
||||
phoneNumberConstraintTableName, phoneNumberIdentifierConstraintTableName, usernamesConstraintTableName,
|
||||
deletedAccountsTableName, scanPageSize);
|
||||
deletedAccountsTableName);
|
||||
}
|
||||
|
||||
public boolean create(final Account account) {
|
||||
@@ -856,23 +848,6 @@ public class Accounts extends AbstractDynamoDbStore {
|
||||
.map(Accounts::fromItem));
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
public AccountCrawlChunk getAllFrom(final UUID from, final int maxCount) {
|
||||
final ScanRequest.Builder scanRequestBuilder = ScanRequest.builder()
|
||||
.limit(scanPageSize)
|
||||
.exclusiveStartKey(Map.of(KEY_ACCOUNT_UUID, AttributeValues.fromUUID(from)));
|
||||
|
||||
return scanForChunk(scanRequestBuilder, maxCount, GET_ALL_FROM_OFFSET_TIMER);
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
public AccountCrawlChunk getAllFromStart(final int maxCount) {
|
||||
final ScanRequest.Builder scanRequestBuilder = ScanRequest.builder()
|
||||
.limit(scanPageSize);
|
||||
|
||||
return scanForChunk(scanRequestBuilder, maxCount, GET_ALL_FROM_START_TIMER);
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
private Optional<Account> getByIndirectLookup(
|
||||
final Timer timer,
|
||||
@@ -1103,14 +1078,6 @@ public class Accounts extends AbstractDynamoDbStore {
|
||||
.build();
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
private AccountCrawlChunk scanForChunk(final ScanRequest.Builder scanRequestBuilder, final int maxCount, final Timer timer) {
|
||||
scanRequestBuilder.tableName(accountsTableName);
|
||||
final List<Map<String, AttributeValue>> items = requireNonNull(timer.record(() -> scan(scanRequestBuilder.build(), maxCount)));
|
||||
final List<Account> accounts = items.stream().map(Accounts::fromItem).toList();
|
||||
return new AccountCrawlChunk(accounts, accounts.size() > 0 ? accounts.get(accounts.size() - 1).getUuid() : null);
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
private static String extractCancellationReasonCodes(final TransactionCanceledException exception) {
|
||||
return exception.cancellationReasons().stream()
|
||||
|
||||
@@ -837,14 +837,6 @@ public class AccountsManager {
|
||||
return accounts.findRecentlyDeletedE164(uuid);
|
||||
}
|
||||
|
||||
public AccountCrawlChunk getAllFromDynamo(int length) {
|
||||
return accounts.getAllFromStart(length);
|
||||
}
|
||||
|
||||
public AccountCrawlChunk getAllFromDynamo(UUID uuid, int length) {
|
||||
return accounts.getAllFrom(uuid, length);
|
||||
}
|
||||
|
||||
public ParallelFlux<Account> streamAllFromDynamo(final int segments, final Scheduler scheduler) {
|
||||
return accounts.getAll(segments, scheduler);
|
||||
}
|
||||
|
||||
@@ -1,132 +0,0 @@
|
||||
/*
|
||||
* Copyright 2013-2020 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
||||
import com.codahale.metrics.Meter;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import io.micrometer.core.instrument.Counter;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
|
||||
import org.whispersystems.textsecuregcm.util.Constants;
|
||||
import org.whispersystems.textsecuregcm.util.Util;
|
||||
|
||||
public class PushFeedbackProcessor extends AccountDatabaseCrawlerListener {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(PushFeedbackProcessor.class);
|
||||
|
||||
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
private final Meter expired = metricRegistry.meter(name(getClass(), "unregistered", "expired"));
|
||||
private final Meter recovered = metricRegistry.meter(name(getClass(), "unregistered", "recovered"));
|
||||
|
||||
private static final Counter UPDATED_ACCOUNT_COUNTER = Metrics.counter(
|
||||
MetricsUtil.name(PushFeedbackProcessor.class, "updatedAccounts"));
|
||||
|
||||
|
||||
private final AccountsManager accountsManager;
|
||||
private final ExecutorService updateExecutor;
|
||||
|
||||
public PushFeedbackProcessor(AccountsManager accountsManager, ExecutorService updateExecutor) {
|
||||
this.accountsManager = accountsManager;
|
||||
this.updateExecutor = updateExecutor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCrawlStart() {}
|
||||
|
||||
@Override
|
||||
public void onCrawlEnd() {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) {
|
||||
|
||||
final List<CompletableFuture<Void>> updateFutures = chunkAccounts.stream()
|
||||
.filter(account -> {
|
||||
boolean update = false;
|
||||
|
||||
for (Device device : account.getDevices()) {
|
||||
if (deviceNeedsUpdate(device)) {
|
||||
if (deviceExpired(device)) {
|
||||
if (device.isEnabled()) {
|
||||
expired.mark();
|
||||
update = true;
|
||||
}
|
||||
} else {
|
||||
recovered.mark();
|
||||
update = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return update;
|
||||
})
|
||||
.map(account -> CompletableFuture.runAsync(() -> {
|
||||
// fetch a new version, since the chunk is shared and implicitly read-only
|
||||
accountsManager.getByAccountIdentifier(account.getUuid()).ifPresent(accountToUpdate -> {
|
||||
accountsManager.update(accountToUpdate, a -> {
|
||||
for (Device device : a.getDevices()) {
|
||||
if (deviceNeedsUpdate(device)) {
|
||||
if (deviceExpired(device)) {
|
||||
if (StringUtils.isNotEmpty(device.getApnId())) {
|
||||
if (device.getId() == 1) {
|
||||
device.setUserAgent("OWI");
|
||||
} else {
|
||||
device.setUserAgent("OWP");
|
||||
}
|
||||
} else if (StringUtils.isNotEmpty(device.getGcmId())) {
|
||||
device.setUserAgent("OWA");
|
||||
}
|
||||
device.setGcmId(null);
|
||||
device.setApnId(null);
|
||||
device.setVoipApnId(null);
|
||||
device.setFetchesMessages(false);
|
||||
} else {
|
||||
device.setUninstalledFeedbackTimestamp(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
}, updateExecutor)
|
||||
.whenComplete((ignored, throwable) -> {
|
||||
if (throwable != null) {
|
||||
log.warn("Failed to update account {}", account.getUuid(), throwable);
|
||||
} else {
|
||||
UPDATED_ACCOUNT_COUNTER.increment();
|
||||
}
|
||||
}))
|
||||
.toList();
|
||||
|
||||
try {
|
||||
CompletableFuture.allOf(updateFutures.toArray(new CompletableFuture[0]))
|
||||
.orTimeout(10, TimeUnit.MINUTES)
|
||||
.join();
|
||||
} catch (final Exception e) {
|
||||
log.debug("Failed to update one or more accounts in chunk", e);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean deviceNeedsUpdate(final Device device) {
|
||||
return device.getUninstalledFeedbackTimestamp() != 0 &&
|
||||
device.getUninstalledFeedbackTimestamp() + TimeUnit.DAYS.toMillis(2) <= Util.todayInMillis();
|
||||
}
|
||||
|
||||
private boolean deviceExpired(final Device device) {
|
||||
return device.getLastSeen() + TimeUnit.DAYS.toMillis(2) <= Util.todayInMillis();
|
||||
}
|
||||
}
|
||||
@@ -156,8 +156,7 @@ public class AssignUsernameCommand extends EnvironmentCommand<WhisperServerConfi
|
||||
configuration.getDynamoDbTables().getAccounts().getPhoneNumberTableName(),
|
||||
configuration.getDynamoDbTables().getAccounts().getPhoneNumberIdentifierTableName(),
|
||||
configuration.getDynamoDbTables().getAccounts().getUsernamesTableName(),
|
||||
configuration.getDynamoDbTables().getDeletedAccounts().getTableName(),
|
||||
configuration.getDynamoDbTables().getAccounts().getScanPageSize());
|
||||
configuration.getDynamoDbTables().getDeletedAccounts().getTableName());
|
||||
PhoneNumberIdentifiers phoneNumberIdentifiers = new PhoneNumberIdentifiers(dynamoDbClient,
|
||||
configuration.getDynamoDbTables().getPhoneNumberIdentifiers().getTableName());
|
||||
Profiles profiles = new Profiles(dynamoDbClient, dynamoDbAsyncClient,
|
||||
|
||||
@@ -131,8 +131,7 @@ record CommandDependencies(
|
||||
configuration.getDynamoDbTables().getAccounts().getPhoneNumberTableName(),
|
||||
configuration.getDynamoDbTables().getAccounts().getPhoneNumberIdentifierTableName(),
|
||||
configuration.getDynamoDbTables().getAccounts().getUsernamesTableName(),
|
||||
configuration.getDynamoDbTables().getDeletedAccounts().getTableName(),
|
||||
configuration.getDynamoDbTables().getAccounts().getScanPageSize());
|
||||
configuration.getDynamoDbTables().getDeletedAccounts().getTableName());
|
||||
PhoneNumberIdentifiers phoneNumberIdentifiers = new PhoneNumberIdentifiers(dynamoDbClient,
|
||||
configuration.getDynamoDbTables().getPhoneNumberIdentifiers().getTableName());
|
||||
Profiles profiles = new Profiles(dynamoDbClient, dynamoDbAsyncClient,
|
||||
|
||||
@@ -1,152 +0,0 @@
|
||||
/*
|
||||
* Copyright 2023 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.whispersystems.textsecuregcm.workers;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
||||
import io.dropwizard.Application;
|
||||
import io.dropwizard.cli.EnvironmentCommand;
|
||||
import io.dropwizard.setup.Environment;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import net.sourceforge.argparse4j.inf.Argument;
|
||||
import net.sourceforge.argparse4j.inf.ArgumentParser;
|
||||
import net.sourceforge.argparse4j.inf.ArgumentParserException;
|
||||
import net.sourceforge.argparse4j.inf.ArgumentType;
|
||||
import net.sourceforge.argparse4j.inf.Namespace;
|
||||
import net.sourceforge.argparse4j.inf.Subparser;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
|
||||
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
|
||||
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawler;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerCache;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerListener;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
|
||||
import org.whispersystems.textsecuregcm.storage.PushFeedbackProcessor;
|
||||
import org.whispersystems.textsecuregcm.util.logging.UncaughtExceptionHandler;
|
||||
|
||||
public class CrawlAccountsCommand extends EnvironmentCommand<WhisperServerConfiguration> {
|
||||
|
||||
private static final String CRAWL_TYPE = "crawlType";
|
||||
private static final String WORKER_COUNT = "workers";
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(CrawlAccountsCommand.class);
|
||||
|
||||
public enum CrawlType implements ArgumentType<CrawlType> {
|
||||
GENERAL_PURPOSE,
|
||||
;
|
||||
|
||||
@Override
|
||||
public CrawlType convert(final ArgumentParser parser, final Argument arg, final String value)
|
||||
throws ArgumentParserException {
|
||||
return CrawlType.valueOf(value);
|
||||
}
|
||||
}
|
||||
|
||||
public CrawlAccountsCommand() {
|
||||
super(new Application<>() {
|
||||
@Override
|
||||
public void run(final WhisperServerConfiguration configuration, final Environment environment) throws Exception {
|
||||
|
||||
}
|
||||
}, "crawl-accounts", "Runs account crawler tasks");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(final Subparser subparser) {
|
||||
super.configure(subparser);
|
||||
subparser.addArgument("--crawl-type")
|
||||
.type(CrawlType.class)
|
||||
.dest(CRAWL_TYPE)
|
||||
.required(true)
|
||||
.help("The type of crawl to perform");
|
||||
|
||||
subparser.addArgument("--workers")
|
||||
.type(Integer.class)
|
||||
.dest(WORKER_COUNT)
|
||||
.required(true)
|
||||
.help("The number of worker threads");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void run(final Environment environment, final Namespace namespace,
|
||||
final WhisperServerConfiguration configuration) throws Exception {
|
||||
|
||||
UncaughtExceptionHandler.register();
|
||||
|
||||
MetricsUtil.configureRegistries(configuration, environment);
|
||||
|
||||
final CommandDependencies deps = CommandDependencies.build("account-crawler", environment, configuration);
|
||||
final AccountsManager accountsManager = deps.accountsManager();
|
||||
|
||||
final FaultTolerantRedisCluster cacheCluster = deps.cacheCluster();
|
||||
final FaultTolerantRedisCluster metricsCluster = new FaultTolerantRedisCluster("metrics_cluster",
|
||||
configuration.getMetricsClusterConfiguration(), deps.redisClusterClientResources());
|
||||
|
||||
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager =
|
||||
new DynamicConfigurationManager<>(configuration.getAppConfig().getApplication(),
|
||||
configuration.getAppConfig().getEnvironment(),
|
||||
configuration.getAppConfig().getConfigurationName(),
|
||||
DynamicConfiguration.class);
|
||||
|
||||
dynamicConfigurationManager.start();
|
||||
MetricsUtil.registerSystemResourceMetrics(environment);
|
||||
|
||||
final int workers = Objects.requireNonNull(namespace.getInt(WORKER_COUNT));
|
||||
|
||||
final AccountDatabaseCrawler crawler = switch ((CrawlType) namespace.get(CRAWL_TYPE)) {
|
||||
case GENERAL_PURPOSE -> {
|
||||
final ExecutorService pushFeedbackUpdateExecutor = environment.lifecycle()
|
||||
.executorService(name(getClass(), "pushFeedback-%d")).maxThreads(workers).minThreads(workers).build();
|
||||
|
||||
// TODO listeners must be ordered so that ones that directly update accounts come last, so that read-only ones are not working with stale data
|
||||
final List<AccountDatabaseCrawlerListener> accountDatabaseCrawlerListeners = List.of(
|
||||
// PushFeedbackProcessor may update device properties
|
||||
new PushFeedbackProcessor(accountsManager, pushFeedbackUpdateExecutor));
|
||||
|
||||
final AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(
|
||||
cacheCluster,
|
||||
AccountDatabaseCrawlerCache.GENERAL_PURPOSE_PREFIX);
|
||||
|
||||
yield new AccountDatabaseCrawler("General-purpose account crawler",
|
||||
accountsManager,
|
||||
accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners,
|
||||
configuration.getAccountDatabaseCrawlerConfiguration().getChunkSize()
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
environment.lifecycle().manage(new CommandStopListener(configuration.getCommandStopListener()));
|
||||
|
||||
environment.lifecycle().getManagedObjects().forEach(managedObject -> {
|
||||
try {
|
||||
managedObject.start();
|
||||
} catch (final Exception e) {
|
||||
logger.error("Failed to start managed object", e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
crawler.crawlAllAccounts();
|
||||
} catch (final Exception e) {
|
||||
LoggerFactory.getLogger(CrawlAccountsCommand.class).error("Error crawling accounts", e);
|
||||
}
|
||||
|
||||
environment.lifecycle().getManagedObjects().forEach(managedObject -> {
|
||||
try {
|
||||
managedObject.stop();
|
||||
} catch (final Exception e) {
|
||||
logger.error("Failed to stop managed object", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user