Introduce a system for mapping IP addresses to ASNs

This commit is contained in:
Jon Chambers
2025-10-09 12:31:26 -04:00
committed by Jon Chambers
parent 73765fc4ec
commit c9760f4c38
13 changed files with 443 additions and 3 deletions

View File

@@ -348,6 +348,11 @@ public class WhisperServerConfiguration extends Configuration {
@JsonProperty
private GrpcConfiguration grpc;
@Valid
@NotNull
@JsonProperty
private S3ObjectMonitorFactory asnTable;
public TlsKeyStoreConfiguration getTlsKeyStoreConfiguration() {
return tlsKeyStore;
}
@@ -582,4 +587,8 @@ public class WhisperServerConfiguration extends Configuration {
public GrpcConfiguration getGrpc() {
return grpc;
}
public S3ObjectMonitorFactory getAsnTableConfiguration() {
return asnTable;
}
}

View File

@@ -73,6 +73,8 @@ import org.signal.libsignal.zkgroup.receipts.ReceiptCredentialPresentation;
import org.signal.libsignal.zkgroup.receipts.ServerZkReceiptOperations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.asn.AsnInfoProvider;
import org.whispersystems.textsecuregcm.asn.AsnInfoProviderImpl;
import org.whispersystems.textsecuregcm.attachments.GcsAttachmentGenerator;
import org.whispersystems.textsecuregcm.attachments.TusAttachmentGenerator;
import org.whispersystems.textsecuregcm.auth.AccountAuthenticator;
@@ -200,6 +202,7 @@ import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
import org.whispersystems.textsecuregcm.registration.RegistrationServiceClient;
import org.whispersystems.textsecuregcm.s3.PolicySigner;
import org.whispersystems.textsecuregcm.s3.PostPolicyGenerator;
import org.whispersystems.textsecuregcm.s3.S3MonitoringSupplier;
import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient;
import org.whispersystems.textsecuregcm.securevaluerecovery.SecureValueRecoveryClient;
import org.whispersystems.textsecuregcm.spam.ChallengeConstraintChecker;
@@ -603,6 +606,14 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
ExternalServiceCredentialsGenerator svrbCredentialsGenerator =
SecureValueRecoveryBCredentialsGeneratorFactory.svrbCredentialsGenerator(config.getSvrbConfiguration());
final S3MonitoringSupplier<AsnInfoProvider> asnInfoProviderSupplier = new S3MonitoringSupplier<>(
recurringJobExecutor,
awsCredentialsProvider,
config.getAsnTableConfiguration(),
AsnInfoProviderImpl::fromTsvGz,
AsnInfoProvider.EMPTY,
"AsnManager");
RegistrationRecoveryPasswordsManager registrationRecoveryPasswordsManager =
new RegistrationRecoveryPasswordsManager(registrationRecoveryPasswords);
UsernameHashZkProofVerifier usernameHashZkProofVerifier = new UsernameHashZkProofVerifier();
@@ -750,6 +761,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
config.getAppleAppStore().appleRootCerts(),
config.getAppleAppStore().retryConfigurationName());
environment.lifecycle().manage(asnInfoProviderSupplier);
environment.lifecycle().manage(apnSender);
environment.lifecycle().manage(pushNotificationScheduler);
environment.lifecycle().manage(provisioningManager);

View File

@@ -0,0 +1,18 @@
/*
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.asn;
import static java.util.Objects.requireNonNull;
import com.google.i18n.phonenumbers.PhoneNumberUtil;
import javax.annotation.Nonnull;
public record AsnInfo(long asn, @Nonnull String regionCode) {
public AsnInfo {
requireNonNull(regionCode, "regionCode must not be null");
}
}

View File

@@ -0,0 +1,21 @@
/*
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.asn;
import java.util.Optional;
import javax.annotation.Nonnull;
public interface AsnInfoProvider {
/// Gets ASN information for an IP address.
///
/// @param ipString a string representation of an IP address
///
/// @return ASN information for the given IP address or empty if no ASN information was found for the given IP address
Optional<AsnInfo> lookup(@Nonnull String ipString);
AsnInfoProvider EMPTY = _ -> Optional.empty();
}

View File

@@ -0,0 +1,165 @@
/*
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.asn;
import static java.util.Objects.requireNonNull;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.invoke.MethodHandles;
import java.math.BigInteger;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.TreeMap;
import java.util.zip.GZIPInputStream;
import javax.annotation.Nonnull;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* {@code AsnInfoProvider} implementation that supports both IPv4 and IPv6.
*/
public class AsnInfoProviderImpl implements AsnInfoProvider {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Nonnull
private final NavigableMap<Long, AsnRange<Long>> asnBlocksByFirstIpv4;
@Nonnull
private final NavigableMap<BigInteger, AsnRange<BigInteger>> asnBlocksByFirstIpv6;
/**
* Creates an instance of {@code AsnInfoProviderImpl} using data from <a href="https://iptoasn.com/">iptoasn.com</a>.
* @param tsvGzInputStream gzip input stream representing the data.
*/
@Nonnull
public static AsnInfoProviderImpl fromTsvGz(@Nonnull final InputStream tsvGzInputStream) {
try (final GZIPInputStream inputStream = new GZIPInputStream(tsvGzInputStream)) {
return fromTsv(inputStream);
} catch (final IOException e) {
log.error("failed to ungzip the input stream", e);
throw new RuntimeException(e);
}
}
/**
* Creates an instance of {@code AsnInfoProviderImpl} using data from <a href="https://iptoasn.com/">iptoasn.com</a>.
* @param tsvInputStream input stream representing the data.
*/
@Nonnull
public static AsnInfoProviderImpl fromTsv(@Nonnull final InputStream tsvInputStream) {
try (final InputStreamReader tsvReader = new InputStreamReader(tsvInputStream)) {
final NavigableMap<Long, AsnRange<Long>> ip4asns = new TreeMap<>();
final NavigableMap<BigInteger, AsnRange<BigInteger>> ip6asns = new TreeMap<>();
final Map<Long, AsnInfo> asnInfoCache = new HashMap<>();
try (final CSVParser csvParser = CSVFormat.TDF.parse(tsvReader)) {
for (final CSVRecord record : csvParser) {
// format:
// range_start_ip_string range_end_ip_string AS_number country_code AS_description
final InetAddress startIp = InetAddress.getByName(record.get(0));
final InetAddress endIp = InetAddress.getByName(record.get(1));
final long asn = Long.parseLong(record.get(2));
final String regionCode = record.get(3);
// country code should be the same for any ASN, so we're caching AsnInfo objects
// not to have multiple instances with the same values
final AsnInfo asnInfo = asnInfoCache.computeIfAbsent(asn, k -> new AsnInfo(asn, regionCode));
if (!regionCode.equals(asnInfo.regionCode())) {
log.warn("ASN {} mapped to country codes {} and {}", asn, regionCode, asnInfo.regionCode());
}
// IPv4
if (startIp instanceof Inet4Address) {
final AsnRange<Long> asnRange = new AsnRange<>(
ip4BytesToLong((Inet4Address) startIp),
ip4BytesToLong((Inet4Address) endIp),
asnInfo
);
ip4asns.put(asnRange.from(), asnRange);
}
// IPv6
if (startIp instanceof Inet6Address) {
final AsnRange<BigInteger> asnRange = new AsnRange<>(
ip6BytesToBigInteger((Inet6Address) startIp),
ip6BytesToBigInteger((Inet6Address) endIp),
asnInfo
);
ip6asns.put(asnRange.from(), asnRange);
}
}
}
return new AsnInfoProviderImpl(ip4asns, ip6asns);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
public AsnInfoProviderImpl(
@Nonnull final NavigableMap<Long, AsnRange<Long>> asnBlocksByFirstIpv4,
@Nonnull final NavigableMap<BigInteger, AsnRange<BigInteger>> asnBlocksByFirstIpv6) {
this.asnBlocksByFirstIpv4 = requireNonNull(asnBlocksByFirstIpv4);
this.asnBlocksByFirstIpv6 = requireNonNull(asnBlocksByFirstIpv6);
}
@Nonnull
@Override
public Optional<AsnInfo> lookup(@Nonnull final String ipString) {
try {
final InetAddress address = InetAddress.getByName(ipString);
if (address instanceof Inet4Address ip4) {
final Long key = ip4BytesToLong(ip4);
return lookupInMap(asnBlocksByFirstIpv4, key);
}
if (address instanceof Inet6Address ip6) {
final BigInteger key = ip6BytesToBigInteger(ip6);
return lookupInMap(asnBlocksByFirstIpv6, key);
}
// safety net, should never happen
log.warn("Unknown InetAddress implementation: {}", address.getClass().getName());
} catch (final Exception e) {
log.error("Could not resolve ASN for IP string {}", ipString);
}
return Optional.empty();
}
@VisibleForTesting
protected static long ip4BytesToLong(@Nonnull final Inet4Address address) {
final byte[] arr = address.getAddress();
Validate.isTrue(arr.length == 4);
return Integer.toUnsignedLong(ByteBuffer.wrap(arr).getInt());
}
@VisibleForTesting
protected static BigInteger ip6BytesToBigInteger(@Nonnull final Inet6Address address) {
final byte[] arr = address.getAddress();
Validate.isTrue(arr.length == 16);
return new BigInteger(1, arr);
}
@Nonnull
private static <T extends Comparable<T>> Optional<AsnInfo> lookupInMap(
@Nonnull final NavigableMap<T, AsnRange<T>> map,
@Nonnull final T key) {
return Optional.ofNullable(map.floorEntry(key))
.filter(e -> e.getValue().contains(key) && e.getValue().asnInfo().asn() != 0)
.map(e -> e.getValue().asnInfo());
}
}

View File

@@ -0,0 +1,28 @@
/*
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.asn;
import static java.util.Objects.requireNonNull;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.Validate;
public record AsnRange<T extends Comparable<T>>(@Nonnull T from,
@Nonnull T to,
@Nonnull AsnInfo asnInfo) {
public AsnRange {
requireNonNull(from);
requireNonNull(to);
requireNonNull(asnInfo);
Validate.isTrue(from.compareTo(to) <= 0);
}
boolean contains(@Nonnull final T element) {
requireNonNull(element);
return from.compareTo(element) <= 0
&& element.compareTo(to) <= 0;
}
}

View File

@@ -0,0 +1,22 @@
/*
* Copyright 2013 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.s3;
import io.dropwizard.lifecycle.Managed;
import java.util.function.Supplier;
public interface ManagedSupplier<T> extends Supplier<T>, Managed {
@Override
default void start() throws Exception {
// noop
}
@Override
default void stop() throws Exception {
// noop
}
}

View File

@@ -0,0 +1,80 @@
/*
* Copyright 2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.s3;
import static java.util.Objects.requireNonNull;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.S3ObjectMonitorFactory;
import org.whispersystems.textsecuregcm.s3.ManagedSupplier;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
public class S3MonitoringSupplier<T> implements ManagedSupplier<T> {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final Timer refreshTimer;
private final Counter refreshErrors;
private final AtomicReference<T> holder;
private final S3ObjectMonitor monitor;
private final Function<InputStream, T> parser;
public S3MonitoringSupplier(
final ScheduledExecutorService executor,
final AwsCredentialsProvider awsCredentialsProvider,
final S3ObjectMonitorFactory cfg,
final Function<InputStream, T> parser,
final T initial,
final String name) {
this.refreshTimer = Metrics.timer(name(S3MonitoringSupplier.class, name, "refresh"));
this.refreshErrors = Metrics.counter(name(S3MonitoringSupplier.class, name, "refreshErrors"));
this.holder = new AtomicReference<>(initial);
this.parser = requireNonNull(parser);
this.monitor = cfg.build(awsCredentialsProvider, executor);
}
@Override
public T get() {
return requireNonNull(holder.get());
}
@Override
public void start() throws Exception {
monitor.start(this::handleObjectChange);
}
@Override
public void stop() throws Exception {
monitor.stop();
}
private void handleObjectChange(final InputStream inputStream) {
refreshTimer.record(() -> {
// parser function is supposed to close the input stream
try {
holder.set(parser.apply(inputStream));
} catch (final Exception e) {
log.error("failed to update internal state from the monitored object", e);
refreshErrors.increment();
}
});
}
}