Add support for server-side feature flags

This commit is contained in:
Jon Chambers
2020-08-26 20:27:33 -04:00
committed by GitHub
parent b9df028bfb
commit 1ef3546822
12 changed files with 511 additions and 2 deletions

View File

@@ -25,6 +25,7 @@ import org.whispersystems.textsecuregcm.configuration.AwsAttachmentsConfiguratio
import org.whispersystems.textsecuregcm.configuration.CdnConfiguration;
import org.whispersystems.textsecuregcm.configuration.DatabaseConfiguration;
import org.whispersystems.textsecuregcm.configuration.DirectoryConfiguration;
import org.whispersystems.textsecuregcm.configuration.FeatureFlagConfiguration;
import org.whispersystems.textsecuregcm.configuration.GcmConfiguration;
import org.whispersystems.textsecuregcm.configuration.GcpAttachmentsConfiguration;
import org.whispersystems.textsecuregcm.configuration.MaxDeviceConfiguration;
@@ -211,6 +212,11 @@ public class WhisperServerConfiguration extends Configuration {
@JsonProperty
private RemoteConfigConfiguration remoteConfig;
@Valid
@NotNull
@JsonProperty
private FeatureFlagConfiguration featureFlag;
private Map<String, String> transparentDataIndex = new HashMap<>();
public RecaptchaConfiguration getRecaptchaConfiguration() {
@@ -354,4 +360,8 @@ public class WhisperServerConfiguration extends Configuration {
public RemoteConfigConfiguration getRemoteConfigConfiguration() {
return remoteConfig;
}
public FeatureFlagConfiguration getFeatureFlagConfiguration() {
return featureFlag;
}
}

View File

@@ -58,7 +58,6 @@ import org.whispersystems.textsecuregcm.auth.DisabledPermittedAccount;
import org.whispersystems.textsecuregcm.auth.DisabledPermittedAccountAuthenticator;
import org.whispersystems.textsecuregcm.auth.ExternalServiceCredentialGenerator;
import org.whispersystems.textsecuregcm.auth.TurnTokenGenerator;
import org.whispersystems.textsecuregcm.configuration.MicrometerConfiguration;
import org.whispersystems.textsecuregcm.controllers.AccountController;
import org.whispersystems.textsecuregcm.controllers.AttachmentControllerV1;
import org.whispersystems.textsecuregcm.controllers.AttachmentControllerV2;
@@ -66,6 +65,7 @@ import org.whispersystems.textsecuregcm.controllers.AttachmentControllerV3;
import org.whispersystems.textsecuregcm.controllers.CertificateController;
import org.whispersystems.textsecuregcm.controllers.DeviceController;
import org.whispersystems.textsecuregcm.controllers.DirectoryController;
import org.whispersystems.textsecuregcm.controllers.FeatureFlagsController;
import org.whispersystems.textsecuregcm.controllers.KeepAliveController;
import org.whispersystems.textsecuregcm.controllers.KeysController;
import org.whispersystems.textsecuregcm.controllers.MessageController;
@@ -122,6 +122,8 @@ import org.whispersystems.textsecuregcm.storage.DirectoryManager;
import org.whispersystems.textsecuregcm.storage.DirectoryReconciler;
import org.whispersystems.textsecuregcm.storage.DirectoryReconciliationClient;
import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
import org.whispersystems.textsecuregcm.storage.FeatureFlags;
import org.whispersystems.textsecuregcm.storage.FeatureFlagsManager;
import org.whispersystems.textsecuregcm.storage.Keys;
import org.whispersystems.textsecuregcm.storage.MessagePersister;
import org.whispersystems.textsecuregcm.storage.Messages;
@@ -163,7 +165,6 @@ import java.security.Security;
import java.time.Duration;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
@@ -268,6 +269,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
Messages messages = new Messages(messageDatabase);
AbusiveHostRules abusiveHostRules = new AbusiveHostRules(abuseDatabase);
RemoteConfigs remoteConfigs = new RemoteConfigs(accountDatabase);
FeatureFlags featureFlags = new FeatureFlags(accountDatabase);
RedisClientFactory pubSubClientFactory = new RedisClientFactory("pubsub_cache", config.getPubsubCacheConfiguration().getUrl(), config.getPubsubCacheConfiguration().getReplicaUrls(), config.getPubsubCacheConfiguration().getCircuitBreakerConfiguration());
RedisClientFactory directoryClientFactory = new RedisClientFactory("directory_cache", config.getDirectoryConfiguration().getRedisConfiguration().getUrl(), config.getDirectoryConfiguration().getRedisConfiguration().getReplicaUrls(), config.getDirectoryConfiguration().getRedisConfiguration().getCircuitBreakerConfiguration());
@@ -284,6 +286,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
FaultTolerantRedisCluster metricsCluster = new FaultTolerantRedisCluster("metrics_cluster", config.getMetricsClusterConfiguration());
ScheduledExecutorService clientPresenceExecutor = environment.lifecycle().scheduledExecutorService("clientPresenceManager").threads(1).build();
ScheduledExecutorService refreshFeatureFlagsExecutor = environment.lifecycle().scheduledExecutorService("featureFlags").threads(1).build();
ExecutorService messageNotificationExecutor = environment.lifecycle().executorService("messageCacheNotifications").maxThreads(8).workQueue(new ArrayBlockingQueue<>(1_000)).build();
ExecutorService messageCacheClusterExperimentExecutor = environment.lifecycle().executorService("messages_cache_experiment").maxThreads(8).workQueue(new ArrayBlockingQueue<>(1_000)).build();
ExecutorService websocketExperimentExecutor = environment.lifecycle().executorService("websocketPresenceExperiment").maxThreads(8).workQueue(new ArrayBlockingQueue<>(1_000)).build();
@@ -301,6 +304,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster);
MessagesManager messagesManager = new MessagesManager(messages, messagesCache, clusterMessagesCache, pushLatencyManager, messageCacheClusterExperimentExecutor);
RemoteConfigsManager remoteConfigsManager = new RemoteConfigsManager(remoteConfigs);
FeatureFlagsManager featureFlagsManager = new FeatureFlagsManager(featureFlags, refreshFeatureFlagsExecutor);
DeadLetterHandler deadLetterHandler = new DeadLetterHandler(accountsManager, messagesManager);
DispatchManager dispatchManager = new DispatchManager(pubSubClientFactory, Optional.of(deadLetterHandler));
PubSubManager pubSubManager = new PubSubManager(pubsubClient, dispatchManager);
@@ -372,6 +376,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
ProfileController profileController = new ProfileController(rateLimiters, accountsManager, profilesManager, usernamesManager, cdnS3Client, profileCdnPolicyGenerator, profileCdnPolicySigner, config.getCdnConfiguration().getBucket(), zkProfileOperations, isZkEnabled);
StickerController stickerController = new StickerController(rateLimiters, config.getCdnConfiguration().getAccessKey(), config.getCdnConfiguration().getAccessSecret(), config.getCdnConfiguration().getRegion(), config.getCdnConfiguration().getBucket());
RemoteConfigController remoteConfigController = new RemoteConfigController(remoteConfigsManager, config.getRemoteConfigConfiguration().getAuthorizedTokens(), config.getRemoteConfigConfiguration().getGlobalConfig());
FeatureFlagsController featureFlagsController = new FeatureFlagsController(featureFlagsManager, config.getFeatureFlagConfiguration().getAuthorizedTokens());
AuthFilter<BasicCredentials, Account> accountAuthFilter = new BasicCredentialAuthFilter.Builder<Account>().setAuthenticator(accountAuthenticator).buildAuthFilter ();
AuthFilter<BasicCredentials, DisabledPermittedAccount> disabledPermittedAccountAuthFilter = new BasicCredentialAuthFilter.Builder<DisabledPermittedAccount>().setAuthenticator(disabledPermittedAccountAuthenticator).buildAuthFilter();
@@ -400,6 +405,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
environment.jersey().register(profileController);
environment.jersey().register(stickerController);
environment.jersey().register(remoteConfigController);
environment.jersey().register(featureFlagsController);
///
WebSocketEnvironment<Account> webSocketEnvironment = new WebSocketEnvironment<>(environment, config.getWebSocketConfiguration(), 90000);

View File

@@ -0,0 +1,17 @@
package org.whispersystems.textsecuregcm.configuration;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.validation.constraints.NotNull;
import java.util.LinkedList;
import java.util.List;
public class FeatureFlagConfiguration {
@JsonProperty
@NotNull
private List<String> authorizedTokens = new LinkedList<>();
public List<String> getAuthorizedTokens() {
return authorizedTokens;
}
}

View File

@@ -0,0 +1,72 @@
package org.whispersystems.textsecuregcm.controllers;
import com.codahale.metrics.annotation.Timed;
import com.google.common.annotations.VisibleForTesting;
import org.whispersystems.textsecuregcm.storage.FeatureFlagsManager;
import javax.ws.rs.DELETE;
import javax.ws.rs.FormParam;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.List;
import java.util.stream.Collectors;
@Path("/v1/featureflag")
public class FeatureFlagsController {
private final FeatureFlagsManager featureFlagsManager;
private final List<byte[]> authorizedTokens;
public FeatureFlagsController(final FeatureFlagsManager featureFlagsManager, final List<String> authorizedTokens) {
this.featureFlagsManager = featureFlagsManager;
this.authorizedTokens = authorizedTokens.stream().map(token -> token.getBytes(StandardCharsets.UTF_8)).collect(Collectors.toList());
}
@Timed
@PUT
@Path("/{featureFlag}")
public void set(@HeaderParam("Token") final String token, @PathParam("featureFlag") final String featureFlag, @FormParam("active") final boolean active) {
if (!isAuthorized(token)) {
throw new WebApplicationException(Response.Status.UNAUTHORIZED);
}
featureFlagsManager.setFeatureFlag(featureFlag, active);
}
@Timed
@DELETE
@Path("/{featureFlag}")
public void delete(@HeaderParam("Token") final String token, @PathParam("featureFlag") final String featureFlag) {
if (!isAuthorized(token)) {
throw new WebApplicationException(Response.Status.UNAUTHORIZED);
}
featureFlagsManager.deleteFeatureFlag(featureFlag);
}
@VisibleForTesting
boolean isAuthorized(final String token) {
if (token == null) {
return false;
}
final byte[] tokenBytes = token.getBytes(StandardCharsets.UTF_8);
boolean authorized = false;
for (final byte[] authorizedToken : authorizedTokens) {
//noinspection IfStatementMissingBreakInLoop
if (MessageDigest.isEqual(authorizedToken, tokenBytes)) {
authorized = true;
}
}
return authorized;
}
}

View File

@@ -0,0 +1,77 @@
package org.whispersystems.textsecuregcm.storage;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import org.jdbi.v3.core.mapper.RowMapper;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Pair;
import java.util.Map;
import java.util.stream.Collectors;
import static com.codahale.metrics.MetricRegistry.name;
/**
* The feature flag database is a persistent store of the state of all server-side feature flags. Feature flags are
* identified by a human-readable name (e.g. "invert-nano-flappers") and are either active or inactive.
* <p/>
* The feature flag database provides the most up-to-date possible view of feature flags, but does so at the cost of
* interacting with a remote data store. In nearly all cases, callers should prefer a cached, eventually-consistent
* view of feature flags (see {@link FeatureFlagsManager}).
* <p/>
* When an operation requiring a feature flag has finished, callers should delete the feature flag to prevent
* accumulation of non-functional flags.
*/
public class FeatureFlags {
private final FaultTolerantDatabase database;
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private final Timer getAllTimer = metricRegistry.timer(name(getClass(), "getAll"));
private final Timer updateTimer = metricRegistry.timer(name(getClass(), "update"));
private final Timer deleteTimer = metricRegistry.timer(name(getClass(), "delete"));
private final Timer vacuumTimer = metricRegistry.timer(name(getClass(), "vacuum"));
private static final RowMapper<Pair<String, Boolean>> PAIR_ROW_MAPPER = (resultSet, statementContext) ->
new Pair<>(resultSet.getString("flag"), resultSet.getBoolean("active"));
public FeatureFlags(final FaultTolerantDatabase database) {
this.database = database;
}
public Map<String, Boolean> getFeatureFlags() {
try (final Timer.Context ignored = getAllTimer.time()) {
return database.with(jdbi -> jdbi.withHandle(handle -> handle.createQuery("SELECT flag, active FROM feature_flags")
.map(PAIR_ROW_MAPPER)
.list()
.stream()
.collect(Collectors.toMap(Pair::first, Pair::second))));
}
}
public void setFlag(final String featureFlag, final boolean active) {
try (final Timer.Context ignored = updateTimer.time()) {
database.use(jdbi -> jdbi.withHandle(handle -> handle.createUpdate("INSERT INTO feature_flags (flag, active) VALUES (:featureFlag, :active) ON CONFLICT (flag) DO UPDATE SET active = EXCLUDED.active")
.bind("featureFlag", featureFlag)
.bind("active", active)
.execute()));
}
}
public void deleteFlag(final String featureFlag) {
try (final Timer.Context ignored = deleteTimer.time()) {
database.use(jdbi -> jdbi.withHandle(handle -> handle.createUpdate("DELETE FROM feature_flags WHERE flag = :featureFlag")
.bind("featureFlag", featureFlag)
.execute()));
}
}
public void vacuum() {
try (final Timer.Context ignored = vacuumTimer.time()) {
database.use(jdbi -> jdbi.useHandle(handle -> {
handle.execute("VACUUM feature_flags");
}));
}
}
}

View File

@@ -0,0 +1,82 @@
package org.whispersystems.textsecuregcm.storage;
import com.google.common.annotations.VisibleForTesting;
import io.dropwizard.lifecycle.Managed;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static com.codahale.metrics.MetricRegistry.name;
/**
* The feature flags manager provides a high-throughput, eventually-consistent view of feature flags. This is the main
* channel through which callers should interact with feature flags.
* <p/>
* Feature flags are intended to provide temporary control over server-side features (i.e. for migrations or experiments
* with new services). Each flag is identified by a human-readable name (e.g. "invert-nano-flappers") and is either
* active or inactive. Flags (including flags that have not been set) are inactive by default.
*/
public class FeatureFlagsManager implements Managed {
private final FeatureFlags featureFlagDatabase;
private final ScheduledExecutorService refreshExecutorService;
private ScheduledFuture<?> refreshFuture;
private final AtomicReference<Map<String, Boolean>> featureFlags = new AtomicReference<>(Collections.emptyMap());
private static final String GAUGE_NAME = "status";
private static final String FLAG_TAG_NAME = "flag";
private static final Duration REFRESH_INTERVAL = Duration.ofSeconds(30);
public FeatureFlagsManager(final FeatureFlags featureFlagDatabase, final ScheduledExecutorService refreshExecutorService) {
this.featureFlagDatabase = featureFlagDatabase;
this.refreshExecutorService = refreshExecutorService;
refreshFeatureFlags();
}
@Override
public void start() {
refreshFuture = refreshExecutorService.scheduleAtFixedRate(this::refreshFeatureFlags, 0, REFRESH_INTERVAL.toSeconds(), TimeUnit.SECONDS);
}
@Override
public void stop() {
refreshFuture.cancel(false);
}
public boolean isFeatureFlagActive(final String featureFlag) {
return featureFlags.get().getOrDefault(featureFlag, false);
}
public void setFeatureFlag(final String featureFlag, final boolean active) {
featureFlagDatabase.setFlag(featureFlag, active);
refreshFeatureFlags();
}
public void deleteFeatureFlag(final String featureFlag) {
featureFlagDatabase.deleteFlag(featureFlag);
refreshFeatureFlags();
}
@VisibleForTesting
void refreshFeatureFlags() {
final Map<String, Boolean> refreshedFeatureFlags = featureFlagDatabase.getFeatureFlags();
featureFlags.set(refreshedFeatureFlags);
for (final Map.Entry<String, Boolean> entry : refreshedFeatureFlags.entrySet()) {
final String featureFlag = entry.getKey();
final boolean active = entry.getValue();
Metrics.gauge(name(getClass(), GAUGE_NAME), List.of(Tag.of(FLAG_TAG_NAME, featureFlag)), active ? 1 : 0);
}
}
}

View File

@@ -8,6 +8,7 @@ import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.configuration.DatabaseConfiguration;
import org.whispersystems.textsecuregcm.storage.Accounts;
import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
import org.whispersystems.textsecuregcm.storage.FeatureFlags;
import org.whispersystems.textsecuregcm.storage.Keys;
import org.whispersystems.textsecuregcm.storage.Messages;
import org.whispersystems.textsecuregcm.storage.PendingAccounts;
@@ -43,6 +44,7 @@ public class VacuumCommand extends ConfiguredCommand<WhisperServerConfiguration>
Keys keys = new Keys(accountDatabase);
PendingAccounts pendingAccounts = new PendingAccounts(accountDatabase);
Messages messages = new Messages(messageDatabase);
FeatureFlags featureFlags = new FeatureFlags(accountDatabase);
logger.info("Vacuuming accounts...");
accounts.vacuum();
@@ -56,6 +58,9 @@ public class VacuumCommand extends ConfiguredCommand<WhisperServerConfiguration>
logger.info("Vacuuming messages...");
messages.vacuum();
logger.info("Vacuuming feature flags...");
featureFlags.vacuum();
Thread.sleep(3000);
System.exit(0);
}