Collapse the feature flag system into the dynamic config system.

This commit is contained in:
Jon Chambers
2021-02-12 11:52:40 -05:00
committed by Jon Chambers
parent d6319aeb92
commit ff448950ed
17 changed files with 127 additions and 610 deletions

View File

@@ -139,8 +139,6 @@ import org.whispersystems.textsecuregcm.storage.DirectoryReconciler;
import org.whispersystems.textsecuregcm.storage.DirectoryReconciliationClient;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
import org.whispersystems.textsecuregcm.storage.FeatureFlags;
import org.whispersystems.textsecuregcm.storage.FeatureFlagsManager;
import org.whispersystems.textsecuregcm.storage.KeysDynamoDb;
import org.whispersystems.textsecuregcm.storage.MessagePersister;
import org.whispersystems.textsecuregcm.storage.MessagesCache;
@@ -166,13 +164,10 @@ import org.whispersystems.textsecuregcm.websocket.DeadLetterHandler;
import org.whispersystems.textsecuregcm.websocket.ProvisioningConnectListener;
import org.whispersystems.textsecuregcm.websocket.WebSocketAccountAuthenticator;
import org.whispersystems.textsecuregcm.workers.CertificateCommand;
import org.whispersystems.textsecuregcm.workers.DeleteFeatureFlagTask;
import org.whispersystems.textsecuregcm.workers.DeleteUserCommand;
import org.whispersystems.textsecuregcm.workers.GetRedisCommandStatsCommand;
import org.whispersystems.textsecuregcm.workers.GetRedisSlowlogCommand;
import org.whispersystems.textsecuregcm.workers.ListFeatureFlagsTask;
import org.whispersystems.textsecuregcm.workers.SetCrawlerAccelerationTask;
import org.whispersystems.textsecuregcm.workers.SetFeatureFlagTask;
import org.whispersystems.textsecuregcm.workers.SetRequestLoggingEnabledTask;
import org.whispersystems.textsecuregcm.workers.VacuumCommand;
import org.whispersystems.textsecuregcm.workers.ZkParamsCommand;
@@ -284,7 +279,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(messageDynamoDb, config.getMessageDynamoDbConfiguration().getTableName(), config.getMessageDynamoDbConfiguration().getTimeToLive());
AbusiveHostRules abusiveHostRules = new AbusiveHostRules(abuseDatabase);
RemoteConfigs remoteConfigs = new RemoteConfigs(accountDatabase);
FeatureFlags featureFlags = new FeatureFlags(accountDatabase);
RedisClientFactory pubSubClientFactory = new RedisClientFactory("pubsub_cache", config.getPubsubCacheConfiguration().getUrl(), config.getPubsubCacheConfiguration().getReplicaUrls(), config.getPubsubCacheConfiguration().getCircuitBreakerConfiguration());
ReplicatedJedisPool pubsubClient = pubSubClientFactory.getRedisClientPool();
@@ -330,7 +324,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, pushLatencyManager);
AccountsManager accountsManager = new AccountsManager(accounts, cacheCluster, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager);
RemoteConfigsManager remoteConfigsManager = new RemoteConfigsManager(remoteConfigs);
FeatureFlagsManager featureFlagsManager = new FeatureFlagsManager(featureFlags, recurringJobExecutor);
DeadLetterHandler deadLetterHandler = new DeadLetterHandler(accountsManager, messagesManager);
DispatchManager dispatchManager = new DispatchManager(pubSubClientFactory, Optional.of(deadLetterHandler));
PubSubManager pubSubManager = new PubSubManager(pubsubClient, dispatchManager);
@@ -358,7 +351,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
TurnTokenGenerator turnTokenGenerator = new TurnTokenGenerator(config.getTurnConfiguration());
RecaptchaClient recaptchaClient = new RecaptchaClient(config.getRecaptchaConfiguration().getSecret());
MessagePersister messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, featureFlagsManager, Duration.ofMinutes(config.getMessageCacheConfiguration().getPersistDelayMinutes()));
MessagePersister messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, dynamicConfigurationManager, Duration.ofMinutes(config.getMessageCacheConfiguration().getPersistDelayMinutes()));
final List<AccountDatabaseCrawlerListener> accountDatabaseCrawlerListeners = new ArrayList<>();
accountDatabaseCrawlerListeners.add(new PushFeedbackProcessor(accountsManager, directoryQueue));
@@ -383,7 +376,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
environment.lifecycle().manage(messagesCache);
environment.lifecycle().manage(messagePersister);
environment.lifecycle().manage(clientPresenceManager);
environment.lifecycle().manage(featureFlagsManager);
AWSCredentials credentials = new BasicAWSCredentials(config.getCdnConfiguration().getAccessKey(), config.getCdnConfiguration().getAccessSecret());
AWSCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(credentials);
@@ -472,9 +464,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
environment.admin().addTask(new SetRequestLoggingEnabledTask());
environment.admin().addTask(new SetCrawlerAccelerationTask(accountDatabaseCrawlerCache));
environment.admin().addTask(new ListFeatureFlagsTask(featureFlagsManager));
environment.admin().addTask(new SetFeatureFlagTask(featureFlagsManager));
environment.admin().addTask(new DeleteFeatureFlagTask(featureFlagsManager));
///

View File

@@ -6,6 +6,7 @@ import javax.validation.Valid;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
public class DynamicConfiguration {
@@ -21,6 +22,9 @@ public class DynamicConfiguration {
@Valid
private DynamicRemoteDeprecationConfiguration remoteDeprecation = new DynamicRemoteDeprecationConfiguration();
@JsonProperty
private Set<String> featureFlags = Collections.emptySet();
public Optional<DynamicExperimentEnrollmentConfiguration> getExperimentEnrollmentConfiguration(final String experimentName) {
return Optional.ofNullable(experiments.get(experimentName));
}
@@ -32,4 +36,8 @@ public class DynamicConfiguration {
public DynamicRemoteDeprecationConfiguration getRemoteDeprecationConfiguration() {
return remoteDeprecation;
}
public Set<String> getActiveFeatureFlags() {
return featureFlags;
}
}

View File

@@ -1,82 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import org.jdbi.v3.core.mapper.RowMapper;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Pair;
import java.util.Map;
import java.util.stream.Collectors;
import static com.codahale.metrics.MetricRegistry.name;
/**
* The feature flag database is a persistent store of the state of all server-side feature flags. Feature flags are
* identified by a human-readable name (e.g. "invert-nano-flappers") and are either active or inactive.
* <p/>
* The feature flag database provides the most up-to-date possible view of feature flags, but does so at the cost of
* interacting with a remote data store. In nearly all cases, callers should prefer a cached, eventually-consistent
* view of feature flags (see {@link FeatureFlagsManager}).
* <p/>
* When an operation requiring a feature flag has finished, callers should delete the feature flag to prevent
* accumulation of non-functional flags.
*/
public class FeatureFlags {
private final FaultTolerantDatabase database;
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private final Timer getAllTimer = metricRegistry.timer(name(getClass(), "getAll"));
private final Timer updateTimer = metricRegistry.timer(name(getClass(), "update"));
private final Timer deleteTimer = metricRegistry.timer(name(getClass(), "delete"));
private final Timer vacuumTimer = metricRegistry.timer(name(getClass(), "vacuum"));
private static final RowMapper<Pair<String, Boolean>> PAIR_ROW_MAPPER = (resultSet, statementContext) ->
new Pair<>(resultSet.getString("flag"), resultSet.getBoolean("active"));
public FeatureFlags(final FaultTolerantDatabase database) {
this.database = database;
}
public Map<String, Boolean> getFeatureFlags() {
try (final Timer.Context ignored = getAllTimer.time()) {
return database.with(jdbi -> jdbi.withHandle(handle -> handle.createQuery("SELECT flag, active FROM feature_flags")
.map(PAIR_ROW_MAPPER)
.list()
.stream()
.collect(Collectors.toMap(Pair::first, Pair::second))));
}
}
public void setFlag(final String featureFlag, final boolean active) {
try (final Timer.Context ignored = updateTimer.time()) {
database.use(jdbi -> jdbi.withHandle(handle -> handle.createUpdate("INSERT INTO feature_flags (flag, active) VALUES (:featureFlag, :active) ON CONFLICT (flag) DO UPDATE SET active = EXCLUDED.active")
.bind("featureFlag", featureFlag)
.bind("active", active)
.execute()));
}
}
public void deleteFlag(final String featureFlag) {
try (final Timer.Context ignored = deleteTimer.time()) {
database.use(jdbi -> jdbi.withHandle(handle -> handle.createUpdate("DELETE FROM feature_flags WHERE flag = :featureFlag")
.bind("featureFlag", featureFlag)
.execute()));
}
}
public void vacuum() {
try (final Timer.Context ignored = vacuumTimer.time()) {
database.use(jdbi -> jdbi.useHandle(handle -> {
handle.execute("VACUUM feature_flags");
}));
}
}
}

View File

@@ -1,102 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import com.google.common.annotations.VisibleForTesting;
import io.dropwizard.lifecycle.Managed;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Metrics;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static com.codahale.metrics.MetricRegistry.name;
/**
* The feature flags manager provides a high-throughput, eventually-consistent view of feature flags. This is the main
* channel through which callers should interact with feature flags.
* <p/>
* Feature flags are intended to provide temporary control over server-side features (i.e. for migrations or experiments
* with new services). Each flag is identified by a human-readable name (e.g. "invert-nano-flappers") and is either
* active or inactive. Flags (including flags that have not been set) are inactive by default.
*/
public class FeatureFlagsManager implements Managed {
private final FeatureFlags featureFlagDatabase;
private final ScheduledExecutorService refreshExecutorService;
private ScheduledFuture<?> refreshFuture;
private final AtomicReference<Map<String, Boolean>> featureFlags = new AtomicReference<>(Collections.emptyMap());
private final Set<Gauge> gauges = new HashSet<>();
private static final String GAUGE_NAME = "status";
private static final String FLAG_TAG_NAME = "flag";
private static final Duration REFRESH_INTERVAL = Duration.ofSeconds(30);
public FeatureFlagsManager(final FeatureFlags featureFlagDatabase, final ScheduledExecutorService refreshExecutorService) {
this.featureFlagDatabase = featureFlagDatabase;
this.refreshExecutorService = refreshExecutorService;
refreshFeatureFlags();
}
@Override
public void start() {
refreshFuture = refreshExecutorService.scheduleAtFixedRate(this::refreshFeatureFlags, 0, REFRESH_INTERVAL.toSeconds(), TimeUnit.SECONDS);
}
@Override
public void stop() {
refreshFuture.cancel(false);
}
public boolean isFeatureFlagActive(final String featureFlag) {
return featureFlags.get().getOrDefault(featureFlag, false);
}
public void setFeatureFlag(final String featureFlag, final boolean active) {
featureFlagDatabase.setFlag(featureFlag, active);
refreshFeatureFlags();
}
public void deleteFeatureFlag(final String featureFlag) {
featureFlagDatabase.deleteFlag(featureFlag);
refreshFeatureFlags();
}
public Map<String, Boolean> getAllFlags() {
return featureFlags.get();
}
@VisibleForTesting
void refreshFeatureFlags() {
final Map<String, Boolean> refreshedFeatureFlags = featureFlagDatabase.getFeatureFlags();
featureFlags.set(Collections.unmodifiableMap(refreshedFeatureFlags));
for (final Gauge gauge : gauges) {
Metrics.globalRegistry.remove(gauge);
}
gauges.clear();
for (final Map.Entry<String, Boolean> entry : refreshedFeatureFlags.entrySet()) {
final String featureFlag = entry.getKey();
final boolean active = entry.getValue();
gauges.add(Gauge.builder(name(getClass(), GAUGE_NAME), () -> active ? 1 : 0)
.tag(FLAG_TAG_NAME, featureFlag)
.register(Metrics.globalRegistry));
}
}
}

View File

@@ -28,10 +28,9 @@ import static com.codahale.metrics.MetricRegistry.name;
public class MessagePersister implements Managed {
private final MessagesCache messagesCache;
private final MessagesManager messagesManager;
private final AccountsManager accountsManager;
private final FeatureFlagsManager featureFlagsManager;
private final MessagesCache messagesCache;
private final MessagesManager messagesManager;
private final AccountsManager accountsManager;
private final Duration persistDelay;
@@ -49,21 +48,21 @@ public class MessagePersister implements Managed {
static final int QUEUE_BATCH_LIMIT = 100;
static final int MESSAGE_BATCH_LIMIT = 100;
private static final String DISABLE_PERSISTER_FEATURE_FLAG = "DISABLE_MESSAGE_PERSISTER";
private static final int WORKER_THREAD_COUNT = 4;
private static final Logger logger = LoggerFactory.getLogger(MessagePersister.class);
public MessagePersister(final MessagesCache messagesCache, final MessagesManager messagesManager, final AccountsManager accountsManager, final FeatureFlagsManager featureFlagsManager, final Duration persistDelay) {
this.messagesCache = messagesCache;
this.messagesManager = messagesManager;
this.accountsManager = accountsManager;
this.featureFlagsManager = featureFlagsManager;
this.persistDelay = persistDelay;
public MessagePersister(final MessagesCache messagesCache, final MessagesManager messagesManager, final AccountsManager accountsManager, final DynamicConfigurationManager dynamicConfigurationManager, final Duration persistDelay) {
this.messagesCache = messagesCache;
this.messagesManager = messagesManager;
this.accountsManager = accountsManager;
this.persistDelay = persistDelay;
for (int i = 0; i < workerThreads.length; i++) {
workerThreads[i] = new Thread(() -> {
while (running) {
if (featureFlagsManager.isFeatureFlagActive("DISABLE_MESSAGE_PERSISTER")) {
if (dynamicConfigurationManager.getConfiguration().getActiveFeatureFlags().contains(DISABLE_PERSISTER_FEATURE_FLAG)) {
Util.sleep(1000);
} else {
try {

View File

@@ -1,31 +0,0 @@
/*
* Copyright 2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.workers;
import io.dropwizard.servlets.tasks.Task;
import org.whispersystems.textsecuregcm.storage.FeatureFlagsManager;
import java.io.PrintWriter;
public abstract class AbstractFeatureFlagTask extends Task {
private final FeatureFlagsManager featureFlagsManager;
protected AbstractFeatureFlagTask(final String name, final FeatureFlagsManager featureFlagsManager) {
super(name);
this.featureFlagsManager = featureFlagsManager;
}
protected FeatureFlagsManager getFeatureFlagsManager() {
return featureFlagsManager;
}
protected void printFeatureFlags(final PrintWriter out) {
out.println("Feature flags:");
featureFlagsManager.getAllFlags().forEach((flag, active) -> out.println(flag + ": " + active));
}
}

View File

@@ -1,35 +0,0 @@
/*
* Copyright 2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.workers;
import org.whispersystems.textsecuregcm.storage.FeatureFlagsManager;
import java.io.PrintWriter;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class DeleteFeatureFlagTask extends AbstractFeatureFlagTask {
public DeleteFeatureFlagTask(final FeatureFlagsManager featureFlagsManager) {
super("delete-feature-flag", featureFlagsManager);
}
@Override
public void execute(final Map<String, List<String>> parameters, final PrintWriter out) {
if (parameters.containsKey("flag")) {
for (final String flag : parameters.getOrDefault("flag", Collections.emptyList())) {
out.println("Deleting feature flag: " + flag);
getFeatureFlagsManager().deleteFeatureFlag(flag);
}
out.println();
printFeatureFlags(out);
} else {
out.println("Usage: delete-feature-flag?flag=FLAG_NAME[&flag=FLAG_NAME2&flag=...]");
}
}
}

View File

@@ -1,24 +0,0 @@
/*
* Copyright 2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.workers;
import org.whispersystems.textsecuregcm.storage.FeatureFlagsManager;
import java.io.PrintWriter;
import java.util.List;
import java.util.Map;
public class ListFeatureFlagsTask extends AbstractFeatureFlagTask {
public ListFeatureFlagsTask(final FeatureFlagsManager featureFlagsManager) {
super("list-feature-flags", featureFlagsManager);
}
@Override
public void execute(final Map<String, List<String>> parameters, final PrintWriter out) {
printFeatureFlags(out);
}
}

View File

@@ -1,40 +0,0 @@
/*
* Copyright 2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.workers;
import org.whispersystems.textsecuregcm.storage.FeatureFlagsManager;
import java.io.PrintWriter;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public class SetFeatureFlagTask extends AbstractFeatureFlagTask {
public SetFeatureFlagTask(final FeatureFlagsManager featureFlagsManager) {
super("set-feature-flag", featureFlagsManager);
}
@Override
public void execute(final Map<String, List<String>> parameters, final PrintWriter out) {
final Optional<String> maybeFlag = Optional.ofNullable(parameters.get("flag"))
.flatMap(values -> values.stream().findFirst());
final Optional<Boolean> maybeActive = Optional.ofNullable(parameters.get("active"))
.flatMap(values -> values.stream().findFirst())
.map(Boolean::valueOf);
if (maybeFlag.isPresent() && maybeActive.isPresent()) {
getFeatureFlagsManager().setFeatureFlag(maybeFlag.get(), maybeActive.get());
out.format("Set %s to %s\n", maybeFlag.get(), maybeActive.get());
out.println();
printFeatureFlags(out);
} else {
out.println("Usage: set-feature-flag?flag=FLAG_NAME&active=[true|false]");
}
}
}

View File

@@ -13,7 +13,6 @@ import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.configuration.DatabaseConfiguration;
import org.whispersystems.textsecuregcm.storage.Accounts;
import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
import org.whispersystems.textsecuregcm.storage.FeatureFlags;
import org.whispersystems.textsecuregcm.storage.PendingAccounts;
import io.dropwizard.cli.ConfiguredCommand;
@@ -40,7 +39,6 @@ public class VacuumCommand extends ConfiguredCommand<WhisperServerConfiguration>
Accounts accounts = new Accounts(accountDatabase);
PendingAccounts pendingAccounts = new PendingAccounts(accountDatabase);
FeatureFlags featureFlags = new FeatureFlags(accountDatabase);
logger.info("Vacuuming accounts...");
accounts.vacuum();
@@ -48,9 +46,6 @@ public class VacuumCommand extends ConfiguredCommand<WhisperServerConfiguration>
logger.info("Vacuuming pending_accounts...");
pendingAccounts.vacuum();
logger.info("Vacuuming feature flags...");
featureFlags.vacuum();
Thread.sleep(3000);
System.exit(0);
}