Remove second database for AbusiveHostRules

This commit is contained in:
Chris Eager
2021-12-07 12:34:23 -08:00
committed by Chris Eager
parent dba1711e8d
commit 0ce87153e5
9 changed files with 8 additions and 329 deletions

View File

@@ -215,11 +215,6 @@ public class WhisperServerConfiguration extends Configuration {
@JsonProperty
private DatabaseConfiguration abuseDatabase;
@Valid
@NotNull
@JsonProperty
private DatabaseConfiguration newAbuseDatabase;
@Valid
@NotNull
@JsonProperty
@@ -461,10 +456,6 @@ public class WhisperServerConfiguration extends Configuration {
return abuseDatabase;
}
public DatabaseConfiguration getNewAbuseDatabaseConfiguration() {
return newAbuseDatabase;
}
public RateLimitsConfiguration getLimitsConfiguration() {
return limits;
}

View File

@@ -214,7 +214,6 @@ import org.whispersystems.textsecuregcm.websocket.WebSocketAccountAuthenticator;
import org.whispersystems.textsecuregcm.workers.CertificateCommand;
import org.whispersystems.textsecuregcm.workers.CheckDynamicConfigurationCommand;
import org.whispersystems.textsecuregcm.workers.DeleteUserCommand;
import org.whispersystems.textsecuregcm.workers.MigrateAbusiveHostRulesCommand;
import org.whispersystems.textsecuregcm.workers.ReserveUsernameCommand;
import org.whispersystems.textsecuregcm.workers.ServerVersionCommand;
import org.whispersystems.textsecuregcm.workers.SetCrawlerAccelerationTask;
@@ -243,7 +242,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
bootstrap.addCommand(new CheckDynamicConfigurationCommand());
bootstrap.addCommand(new SetUserDiscoverabilityCommand());
bootstrap.addCommand(new ReserveUsernameCommand());
bootstrap.addCommand(new MigrateAbusiveHostRulesCommand());
bootstrap.addBundle(new NameableMigrationsBundle<WhisperServerConfiguration>("abusedb", "abusedb.xml") {
@Override
@@ -312,10 +310,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
FaultTolerantDatabase abuseDatabase = new FaultTolerantDatabase("abuse_database", abuseJdbi,
config.getAbuseDatabaseConfiguration().getCircuitBreakerConfiguration());
Jdbi newAbuseJdbi = jdbiFactory.build(environment, config.getAbuseDatabaseConfiguration(), "abusedb2");
FaultTolerantDatabase newAbuseDatabase = new FaultTolerantDatabase("abuse_database2", newAbuseJdbi,
config.getAbuseDatabaseConfiguration().getCircuitBreakerConfiguration());
DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbFromConfig.asyncClient(
config.getDynamoDbClientConfiguration(),
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
@@ -394,8 +388,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(messageDynamoDb,
config.getMessageDynamoDbConfiguration().getTableName(),
config.getMessageDynamoDbConfiguration().getTimeToLive());
AbusiveHostRules abusiveHostRules = new AbusiveHostRules(abuseDatabase, newAbuseDatabase,
dynamicConfigurationManager);
AbusiveHostRules abusiveHostRules = new AbusiveHostRules(abuseDatabase);
RemoteConfigs remoteConfigs = new RemoteConfigs(dynamoDbClient,
config.getDynamoDbTables().getRemoteConfig().getTableName());
PushChallengeDynamoDb pushChallengeDynamoDb = new PushChallengeDynamoDb(pushChallengeDynamoDbClient,

View File

@@ -1,32 +0,0 @@
/*
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.configuration.dynamic;
import com.fasterxml.jackson.annotation.JsonProperty;
public class DynamicAbusiveHostRulesMigrationConfiguration {
@JsonProperty
private boolean newReadEnabled = false;
@JsonProperty
private boolean newWriteEnabled = false;
@JsonProperty
private boolean newPrimary = false;
public boolean isNewReadEnabled() {
return newReadEnabled;
}
public boolean isNewWriteEnabled() {
return newWriteEnabled;
}
public boolean isNewPrimary() {
return newPrimary;
}
}

View File

@@ -55,14 +55,6 @@ public class DynamicConfiguration {
@Valid
private DynamicPushLatencyConfiguration pushLatency = new DynamicPushLatencyConfiguration(Collections.emptyMap());
@JsonProperty
@Valid
private DynamicProfileMigrationConfiguration profileMigration = new DynamicProfileMigrationConfiguration();
@JsonProperty
@Valid
private DynamicAbusiveHostRulesMigrationConfiguration abusiveHostRulesMigration = new DynamicAbusiveHostRulesMigrationConfiguration();
public Optional<DynamicExperimentEnrollmentConfiguration> getExperimentEnrollmentConfiguration(
final String experimentName) {
return Optional.ofNullable(experiments.get(experimentName));
@@ -118,11 +110,4 @@ public class DynamicConfiguration {
return pushLatency;
}
public DynamicProfileMigrationConfiguration getProfileMigrationConfiguration() {
return profileMigration;
}
public DynamicAbusiveHostRulesMigrationConfiguration getAbusiveHostRulesMigrationConfiguration() {
return abusiveHostRulesMigration;
}
}

View File

@@ -1,46 +0,0 @@
/*
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.configuration.dynamic;
import com.fasterxml.jackson.annotation.JsonProperty;
public class DynamicProfileMigrationConfiguration {
@JsonProperty
private boolean dynamoDbDeleteEnabled = false;
@JsonProperty
private boolean dynamoDbWriteEnabled = false;
@JsonProperty
private boolean dynamoDbReadForComparisonEnabled = false;
@JsonProperty
private boolean dynamoDbReadPrimary = false;
@JsonProperty
private boolean logMismatches = false;
public boolean isDynamoDbDeleteEnabled() {
return dynamoDbDeleteEnabled;
}
public boolean isDynamoDbWriteEnabled() {
return dynamoDbWriteEnabled;
}
public boolean isDynamoDbReadForComparisonEnabled() {
return dynamoDbReadForComparisonEnabled;
}
public boolean isDynamoDbReadPrimary() {
return dynamoDbReadPrimary;
}
public boolean isLogMismatches() {
return logMismatches;
}
}

View File

@@ -10,17 +10,11 @@ import static com.codahale.metrics.MetricRegistry.name;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import com.google.common.base.Suppliers;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.experiment.Experiment;
import org.whispersystems.textsecuregcm.storage.mappers.AbusiveHostRuleRowMapper;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Pair;
public class AbusiveHostRules {
@@ -36,26 +30,16 @@ public class AbusiveHostRules {
private final Timer getTimer = metricRegistry.timer(name(AbusiveHostRules.class, "get"));
private final Timer insertTimer = metricRegistry.timer(name(AbusiveHostRules.class, "setBlockedHost"));
private final FaultTolerantDatabase oldDatabase;
private final FaultTolerantDatabase newDatabase;
private final FaultTolerantDatabase database;
private final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager;
private final Experiment migrationExperiment = new Experiment("abusiveHostRulesMigration");
public AbusiveHostRules(FaultTolerantDatabase database) {
public AbusiveHostRules(FaultTolerantDatabase oldDatabase, FaultTolerantDatabase newDatabase,
DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager) {
this.oldDatabase = oldDatabase;
this.oldDatabase.getDatabase().registerRowMapper(new AbusiveHostRuleRowMapper());
this.newDatabase = newDatabase;
this.newDatabase.getDatabase().registerRowMapper(new AbusiveHostRuleRowMapper());
this.dynamicConfigurationManager = dynamicConfigurationManager;
this.database = database;
this.database.getDatabase().registerRowMapper(new AbusiveHostRuleRowMapper());
}
public List<AbusiveHostRule> getAbusiveHostRulesFor(String host) {
final List<AbusiveHostRule> oldDbRules = oldDatabase.with(jdbi -> jdbi.withHandle(handle -> {
return database.with(jdbi -> jdbi.withHandle(handle -> {
try (Timer.Context timer = getTimer.time()) {
return handle.createQuery("SELECT * FROM abusive_host_rules WHERE :host::inet <<= " + HOST)
.bind("host", host)
@@ -63,25 +47,10 @@ public class AbusiveHostRules {
.list();
}
}));
final Supplier<List<AbusiveHostRule>> newDbRules = Suppliers.memoize(
() -> newDatabase.with(jdbi -> jdbi.withHandle(
handle -> handle.createQuery("SELECT * FROM abusive_host_rules WHERE :host::inet <<= " + HOST)
.bind("host", host)
.mapTo(AbusiveHostRule.class)
.list())));
if (dynamicConfigurationManager.getConfiguration().getAbusiveHostRulesMigrationConfiguration().isNewReadEnabled()) {
migrationExperiment.compareSupplierResult(oldDbRules, newDbRules);
}
return dynamicConfigurationManager.getConfiguration().getAbusiveHostRulesMigrationConfiguration().isNewPrimary()
? newDbRules.get()
: oldDbRules;
}
public void setBlockedHost(String host, String notes) {
oldDatabase.use(jdbi -> jdbi.useHandle(handle -> {
database.use(jdbi -> jdbi.useHandle(handle -> {
try (Timer.Context timer = insertTimer.time()) {
handle.createUpdate(
"INSERT INTO abusive_host_rules(host, blocked, notes) VALUES(:host::inet, :blocked, :notes) ON CONFLICT DO NOTHING")
@@ -91,47 +60,6 @@ public class AbusiveHostRules {
.execute();
}
}));
if (dynamicConfigurationManager.getConfiguration().getAbusiveHostRulesMigrationConfiguration()
.isNewWriteEnabled()) {
try {
newDatabase.use(jdbi -> jdbi.useHandle(handle -> handle.createUpdate(
"INSERT INTO abusive_host_rules(host, blocked, notes) VALUES(:host::inet, :blocked, :notes) ON CONFLICT DO NOTHING")
.bind("host", host)
.bind("blocked", 1)
.bind("notes", notes)
.execute()));
} catch (final Exception e) {
logger.warn("Failed to insert rule in new database", e);
}
}
}
public int migrateAbusiveHostRule(AbusiveHostRule rule, String notes) {
return newDatabase.with(jdbi -> jdbi.withHandle(handle -> {
try (Timer.Context timer = insertTimer.time()) {
return handle.createUpdate(
"INSERT INTO abusive_host_rules(host, blocked, notes, regions) VALUES(:host::inet, :blocked, :notes, :regions) ON CONFLICT DO NOTHING")
.bind("host", rule.host())
.bind("blocked", rule.blocked() ? 1 : 0)
.bind("notes", notes)
.bind("regions", String.join(",", rule.regions()))
.execute();
}
}));
}
public void forEachInOldDatabase(final BiConsumer<AbusiveHostRule, String> consumer, final int fetchSize) {
final AbusiveHostRuleRowMapper rowMapper = new AbusiveHostRuleRowMapper();
oldDatabase.use(jdbi -> jdbi.useHandle(handle -> handle.useTransaction(transactionHandle ->
transactionHandle.createQuery("SELECT * FROM abusive_host_rules")
.setFetchSize(fetchSize)
.map((resultSet, ctx) -> {
AbusiveHostRule rule = rowMapper.map(resultSet, ctx);
String notes = resultSet.getString(NOTES);
return new Pair<>(rule, notes);
})
.forEach(pair -> consumer.accept(pair.first(), pair.second())))));
}
}

View File

@@ -1,92 +0,0 @@
/*
* Copyright 2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.workers;
import com.codahale.metrics.jdbi3.strategies.DefaultNameStrategy;
import io.dropwizard.Application;
import io.dropwizard.cli.EnvironmentCommand;
import io.dropwizard.jdbi3.JdbiFactory;
import io.dropwizard.setup.Environment;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import org.jdbi.v3.core.Jdbi;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.storage.AbusiveHostRules;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
import java.util.concurrent.atomic.AtomicInteger;
public class MigrateAbusiveHostRulesCommand extends EnvironmentCommand<WhisperServerConfiguration> {
private static final Logger log = LoggerFactory.getLogger(MigrateAbusiveHostRulesCommand.class);
public MigrateAbusiveHostRulesCommand() {
super(new Application<>() {
@Override
public void run(WhisperServerConfiguration configuration, Environment environment) {
}
}, "migrate-abusive-host-rules", "Migrate abusive host rules from one Postgres to another");
}
@Override
public void configure(Subparser subparser) {
super.configure(subparser);
subparser.addArgument("-s", "--fetch-size")
.dest("fetchSize")
.type(Integer.class)
.required(false)
.setDefault(512)
.help("The number of rules to fetch from Postgres at once");
}
@Override
protected void run(final Environment environment, final Namespace namespace,
final WhisperServerConfiguration config) throws Exception {
DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager =
new DynamicConfigurationManager<>(config.getAppConfig().getApplication(),
config.getAppConfig().getEnvironment(),
config.getAppConfig().getConfigurationName(),
DynamicConfiguration.class);
JdbiFactory jdbiFactory = new JdbiFactory(DefaultNameStrategy.CHECK_EMPTY);
Jdbi abuseJdbi = jdbiFactory.build(environment, config.getAbuseDatabaseConfiguration(), "abusedb");
FaultTolerantDatabase abuseDatabase = new FaultTolerantDatabase("abuse_database", abuseJdbi,
config.getAbuseDatabaseConfiguration().getCircuitBreakerConfiguration());
Jdbi newAbuseJdbi = jdbiFactory.build(environment, config.getNewAbuseDatabaseConfiguration(), "abusedb2");
FaultTolerantDatabase newAbuseDatabase = new FaultTolerantDatabase("abuse_database2", newAbuseJdbi,
config.getNewAbuseDatabaseConfiguration().getCircuitBreakerConfiguration());
log.info("Beginning migration");
AbusiveHostRules abusiveHostRules = new AbusiveHostRules(abuseDatabase, newAbuseDatabase,
dynamicConfigurationManager);
final int fetchSize = namespace.getInt("fetchSize");
final AtomicInteger rulesMigrated = new AtomicInteger(0);
abusiveHostRules.forEachInOldDatabase((rule, notes) -> {
abusiveHostRules.migrateAbusiveHostRule(rule, notes);
int migrated = rulesMigrated.incrementAndGet();
if (migrated % 1_000 == 0) {
log.info("Migrated {} rules", migrated);
}
}, fetchSize);
log.info("Migration complete ({} total rules)", rulesMigrated.get());
}
}