Use the S3 object monitor to retrieve Tor exit node lists.

This commit is contained in:
Jon Chambers
2021-05-17 17:58:13 -04:00
committed by Jon Chambers
parent cfa8cbedc1
commit fbaf4a09e2
4 changed files with 79 additions and 183 deletions

View File

@@ -397,7 +397,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
ExecutorService gcmSenderExecutor = environment.lifecycle().executorService(name(getClass(), "gcmSender-%d")).maxThreads(1).minThreads(1).build();
ExecutorService backupServiceExecutor = environment.lifecycle().executorService(name(getClass(), "backupService-%d")).maxThreads(1).minThreads(1).build();
ExecutorService storageServiceExecutor = environment.lifecycle().executorService(name(getClass(), "storageService-%d")).maxThreads(1).minThreads(1).build();
ExecutorService torExitNodeExecutor = environment.lifecycle().executorService(name(getClass(), "torExitNode-%d")).maxThreads(1).minThreads(1).build();
ExecutorService donationExecutor = environment.lifecycle().executorService(name(getClass(), "donation-%d")).maxThreads(1).minThreads(1).build();
ExternalServiceCredentialGenerator directoryCredentialsGenerator = new ExternalServiceCredentialGenerator(config.getDirectoryConfiguration().getDirectoryClientConfiguration().getUserAuthenticationTokenSharedSecret(),
@@ -437,7 +436,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
GCMSender gcmSender = new GCMSender(gcmSenderExecutor, accountsManager, config.getGcmConfiguration().getApiKey());
RateLimiters rateLimiters = new RateLimiters(config.getLimitsConfiguration(), dynamicConfigurationManager, rateLimitersCluster);
ProvisioningManager provisioningManager = new ProvisioningManager(pubSubManager);
TorExitNodeManager torExitNodeManager = new TorExitNodeManager(recurringJobExecutor, torExitNodeExecutor, config.getTorExitNodeConfiguration());
TorExitNodeManager torExitNodeManager = new TorExitNodeManager(recurringJobExecutor, config.getTorExitNodeConfiguration());
AccountAuthenticator accountAuthenticator = new AccountAuthenticator(accountsManager);
DisabledPermittedAccountAuthenticator disabledPermittedAccountAuthenticator = new DisabledPermittedAccountAuthenticator(accountsManager);

View File

@@ -15,37 +15,37 @@ public class TorExitNodeConfiguration {
@JsonProperty
@NotBlank
private String listUrl;
private String s3Region;
@JsonProperty
@NotBlank
private String s3Bucket;
@JsonProperty
@NotBlank
private String objectKey;
@JsonProperty
private Duration refreshInterval = Duration.ofMinutes(5);
@JsonProperty
@Valid
private CircuitBreakerConfiguration circuitBreakerConfiguration = new CircuitBreakerConfiguration();
@JsonProperty
@Valid
private RetryConfiguration retryConfiguration = new RetryConfiguration();
public String getListUrl() {
return listUrl;
public String getS3Region() {
return s3Region;
}
@VisibleForTesting
public void setListUrl(final String listUrl) {
this.listUrl = listUrl;
public void setS3Region(final String s3Region) {
this.s3Region = s3Region;
}
public String getS3Bucket() {
return s3Bucket;
}
public String getObjectKey() {
return objectKey;
}
public Duration getRefreshInterval() {
return refreshInterval;
}
public CircuitBreakerConfiguration getCircuitBreakerConfiguration() {
return circuitBreakerConfiguration;
}
public RetryConfiguration getRetryConfiguration() {
return retryConfiguration;
}
}

View File

@@ -5,32 +5,25 @@
package org.whispersystems.textsecuregcm.util;
import static com.codahale.metrics.MetricRegistry.name;
import com.amazonaws.services.s3.model.S3Object;
import com.google.common.annotations.VisibleForTesting;
import io.dropwizard.lifecycle.Managed;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import org.apache.commons.lang3.StringUtils;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.TorExitNodeConfiguration;
import org.whispersystems.textsecuregcm.http.FaultTolerantHttpClient;
import static com.codahale.metrics.MetricRegistry.name;
/**
* A utility for checking whether IP addresses belong to Tor exit nodes using the "bulk exit list."
@@ -39,14 +32,8 @@ import static com.codahale.metrics.MetricRegistry.name;
*/
public class TorExitNodeManager implements Managed {
private final ScheduledExecutorService refreshScheduledExecutorService;
private final Duration refreshDelay;
private ScheduledFuture<?> refreshFuture;
private final S3ObjectMonitor exitListMonitor;
private final FaultTolerantHttpClient refreshClient;
private final URI exitNodeListUri;
private final AtomicReference<String> lastEtag = new AtomicReference<>(null);
private final AtomicReference<Set<String>> exitNodeAddresses = new AtomicReference<>(Collections.emptySet());
private static final Timer REFRESH_TIMER = Metrics.timer(name(TorExitNodeManager.class, "refresh"));
@@ -54,78 +41,45 @@ public class TorExitNodeManager implements Managed {
private static final Logger log = LoggerFactory.getLogger(TorExitNodeManager.class);
public TorExitNodeManager(final ScheduledExecutorService scheduledExecutorService, final ExecutorService clientExecutorService, final TorExitNodeConfiguration configuration) {
this.refreshScheduledExecutorService = scheduledExecutorService;
public TorExitNodeManager(
final ScheduledExecutorService scheduledExecutorService,
final TorExitNodeConfiguration configuration) {
this.exitNodeListUri = URI.create(configuration.getListUrl());
this.refreshDelay = configuration.getRefreshInterval();
this.exitListMonitor = new S3ObjectMonitor(
configuration.getS3Region(),
configuration.getS3Bucket(),
configuration.getObjectKey(),
scheduledExecutorService,
configuration.getRefreshInterval(),
this::handleExitListChanged);
}
refreshClient = FaultTolerantHttpClient.newBuilder()
.withCircuitBreaker(configuration.getCircuitBreakerConfiguration())
.withRetry(configuration.getRetryConfiguration())
.withVersion(HttpClient.Version.HTTP_1_1)
.withConnectTimeout(Duration.ofSeconds(10))
.withRedirect(HttpClient.Redirect.NEVER)
.withExecutor(clientExecutorService)
.withName("tor-exit-node")
.withSecurityProtocol(FaultTolerantHttpClient.SECURITY_PROTOCOL_TLS_1_3)
.build();
@Override
public synchronized void start() {
exitListMonitor.refresh();
exitListMonitor.start();
}
@Override
public synchronized void stop() {
exitListMonitor.stop();
}
public boolean isTorExitNode(final String address) {
return exitNodeAddresses.get().contains(address);
}
private void handleExitListChanged(final S3Object exitList) {
REFRESH_TIMER.record(() -> handleExitListChanged(exitList.getObjectContent()));
}
@VisibleForTesting
CompletableFuture<?> refresh() {
final String etag = lastEtag.get();
final HttpRequest request;
{
final HttpRequest.Builder builder = HttpRequest.newBuilder().GET().uri(exitNodeListUri);
if (StringUtils.isNotBlank(etag)) {
builder.header("If-None-Match", etag);
}
request = builder.build();
}
final long start = System.nanoTime();
return refreshClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())
.whenComplete((response, cause) -> {
REFRESH_TIMER.record(System.nanoTime() - start, TimeUnit.NANOSECONDS);
if (cause != null) {
REFRESH_ERRORS.increment();
log.warn("Failed to refresh Tor exit node list", cause);
} else {
if (response.statusCode() == 200) {
exitNodeAddresses.set(response.body().lines().collect(Collectors.toSet()));
response.headers().firstValue("ETag").ifPresent(newEtag -> lastEtag.compareAndSet(etag, newEtag));
} else if (response.statusCode() != 304) {
REFRESH_ERRORS.increment();
log.warn("Failed to refresh Tor exit node list: {} ({})", response.statusCode(), response.body());
}
}
});
}
@Override
public synchronized void start() {
if (refreshFuture != null) {
refreshFuture.cancel(true);
}
refreshFuture = refreshScheduledExecutorService
.scheduleAtFixedRate(this::refresh, 0, refreshDelay.toMillis(), TimeUnit.MILLISECONDS);
}
@Override
public synchronized void stop() {
if (refreshFuture != null) {
refreshFuture.cancel(true);
void handleExitListChanged(final InputStream inputStream) {
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
exitNodeAddresses.set(reader.lines().collect(Collectors.toSet()));
} catch (final Exception e) {
REFRESH_ERRORS.increment();
log.warn("Failed to refresh Tor exit node list", e);
}
}
}