diff --git a/service/config/sample.yml b/service/config/sample.yml index 24a35bcbe..c7976c7e9 100644 --- a/service/config/sample.yml +++ b/service/config/sample.yml @@ -542,3 +542,10 @@ idlePrimaryDeviceReminder: grpc: port: 50051 + +asnTable: + s3Region: a-region + s3Bucket: a-bucket + objectKey: asn.tsv + maxSize: 100000 + refreshInterval: PT10S diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java index 2d717a4bf..d5dd0d133 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java @@ -348,6 +348,11 @@ public class WhisperServerConfiguration extends Configuration { @JsonProperty private GrpcConfiguration grpc; + @Valid + @NotNull + @JsonProperty + private S3ObjectMonitorFactory asnTable; + public TlsKeyStoreConfiguration getTlsKeyStoreConfiguration() { return tlsKeyStore; } @@ -582,4 +587,8 @@ public class WhisperServerConfiguration extends Configuration { public GrpcConfiguration getGrpc() { return grpc; } + + public S3ObjectMonitorFactory getAsnTableConfiguration() { + return asnTable; + } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 4b808bf0c..61a7e94d3 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -73,6 +73,8 @@ import org.signal.libsignal.zkgroup.receipts.ReceiptCredentialPresentation; import org.signal.libsignal.zkgroup.receipts.ServerZkReceiptOperations; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.asn.AsnInfoProvider; +import org.whispersystems.textsecuregcm.asn.AsnInfoProviderImpl; import org.whispersystems.textsecuregcm.attachments.GcsAttachmentGenerator; import org.whispersystems.textsecuregcm.attachments.TusAttachmentGenerator; import org.whispersystems.textsecuregcm.auth.AccountAuthenticator; @@ -200,6 +202,7 @@ import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; import org.whispersystems.textsecuregcm.registration.RegistrationServiceClient; import org.whispersystems.textsecuregcm.s3.PolicySigner; import org.whispersystems.textsecuregcm.s3.PostPolicyGenerator; +import org.whispersystems.textsecuregcm.s3.S3MonitoringSupplier; import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient; import org.whispersystems.textsecuregcm.securevaluerecovery.SecureValueRecoveryClient; import org.whispersystems.textsecuregcm.spam.ChallengeConstraintChecker; @@ -603,6 +606,14 @@ public class WhisperServerService extends Application asnInfoProviderSupplier = new S3MonitoringSupplier<>( + recurringJobExecutor, + awsCredentialsProvider, + config.getAsnTableConfiguration(), + AsnInfoProviderImpl::fromTsvGz, + AsnInfoProvider.EMPTY, + "AsnManager"); + RegistrationRecoveryPasswordsManager registrationRecoveryPasswordsManager = new RegistrationRecoveryPasswordsManager(registrationRecoveryPasswords); UsernameHashZkProofVerifier usernameHashZkProofVerifier = new UsernameHashZkProofVerifier(); @@ -750,6 +761,8 @@ public class WhisperServerService extends Application lookup(@Nonnull String ipString); + + AsnInfoProvider EMPTY = _ -> Optional.empty(); +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/asn/AsnInfoProviderImpl.java b/service/src/main/java/org/whispersystems/textsecuregcm/asn/AsnInfoProviderImpl.java new file mode 100644 index 000000000..e7713c83b --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/asn/AsnInfoProviderImpl.java @@ -0,0 +1,165 @@ +/* + * Copyright 2013-2021 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.asn; + +import static java.util.Objects.requireNonNull; + +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.lang.invoke.MethodHandles; +import java.math.BigInteger; +import java.net.Inet4Address; +import java.net.Inet6Address; +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Optional; +import java.util.TreeMap; +import java.util.zip.GZIPInputStream; +import javax.annotation.Nonnull; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; +import org.apache.commons.lang3.Validate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@code AsnInfoProvider} implementation that supports both IPv4 and IPv6. + */ +public class AsnInfoProviderImpl implements AsnInfoProvider { + + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + @Nonnull + private final NavigableMap> asnBlocksByFirstIpv4; + + @Nonnull + private final NavigableMap> asnBlocksByFirstIpv6; + + + /** + * Creates an instance of {@code AsnInfoProviderImpl} using data from iptoasn.com. + * @param tsvGzInputStream gzip input stream representing the data. + */ + @Nonnull + public static AsnInfoProviderImpl fromTsvGz(@Nonnull final InputStream tsvGzInputStream) { + try (final GZIPInputStream inputStream = new GZIPInputStream(tsvGzInputStream)) { + return fromTsv(inputStream); + } catch (final IOException e) { + log.error("failed to ungzip the input stream", e); + throw new RuntimeException(e); + } + } + + /** + * Creates an instance of {@code AsnInfoProviderImpl} using data from iptoasn.com. + * @param tsvInputStream input stream representing the data. + */ + @Nonnull + public static AsnInfoProviderImpl fromTsv(@Nonnull final InputStream tsvInputStream) { + try (final InputStreamReader tsvReader = new InputStreamReader(tsvInputStream)) { + final NavigableMap> ip4asns = new TreeMap<>(); + final NavigableMap> ip6asns = new TreeMap<>(); + final Map asnInfoCache = new HashMap<>(); + + try (final CSVParser csvParser = CSVFormat.TDF.parse(tsvReader)) { + for (final CSVRecord record : csvParser) { + // format: + // range_start_ip_string range_end_ip_string AS_number country_code AS_description + final InetAddress startIp = InetAddress.getByName(record.get(0)); + final InetAddress endIp = InetAddress.getByName(record.get(1)); + final long asn = Long.parseLong(record.get(2)); + final String regionCode = record.get(3); + // country code should be the same for any ASN, so we're caching AsnInfo objects + // not to have multiple instances with the same values + final AsnInfo asnInfo = asnInfoCache.computeIfAbsent(asn, k -> new AsnInfo(asn, regionCode)); + if (!regionCode.equals(asnInfo.regionCode())) { + log.warn("ASN {} mapped to country codes {} and {}", asn, regionCode, asnInfo.regionCode()); + } + + // IPv4 + if (startIp instanceof Inet4Address) { + final AsnRange asnRange = new AsnRange<>( + ip4BytesToLong((Inet4Address) startIp), + ip4BytesToLong((Inet4Address) endIp), + asnInfo + ); + ip4asns.put(asnRange.from(), asnRange); + } + + // IPv6 + if (startIp instanceof Inet6Address) { + final AsnRange asnRange = new AsnRange<>( + ip6BytesToBigInteger((Inet6Address) startIp), + ip6BytesToBigInteger((Inet6Address) endIp), + asnInfo + ); + ip6asns.put(asnRange.from(), asnRange); + } + } + } + return new AsnInfoProviderImpl(ip4asns, ip6asns); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + public AsnInfoProviderImpl( + @Nonnull final NavigableMap> asnBlocksByFirstIpv4, + @Nonnull final NavigableMap> asnBlocksByFirstIpv6) { + this.asnBlocksByFirstIpv4 = requireNonNull(asnBlocksByFirstIpv4); + this.asnBlocksByFirstIpv6 = requireNonNull(asnBlocksByFirstIpv6); + } + + @Nonnull + @Override + public Optional lookup(@Nonnull final String ipString) { + try { + final InetAddress address = InetAddress.getByName(ipString); + if (address instanceof Inet4Address ip4) { + final Long key = ip4BytesToLong(ip4); + return lookupInMap(asnBlocksByFirstIpv4, key); + } + if (address instanceof Inet6Address ip6) { + final BigInteger key = ip6BytesToBigInteger(ip6); + return lookupInMap(asnBlocksByFirstIpv6, key); + } + // safety net, should never happen + log.warn("Unknown InetAddress implementation: {}", address.getClass().getName()); + } catch (final Exception e) { + log.error("Could not resolve ASN for IP string {}", ipString); + } + return Optional.empty(); + } + + @VisibleForTesting + protected static long ip4BytesToLong(@Nonnull final Inet4Address address) { + final byte[] arr = address.getAddress(); + Validate.isTrue(arr.length == 4); + return Integer.toUnsignedLong(ByteBuffer.wrap(arr).getInt()); + } + + @VisibleForTesting + protected static BigInteger ip6BytesToBigInteger(@Nonnull final Inet6Address address) { + final byte[] arr = address.getAddress(); + Validate.isTrue(arr.length == 16); + return new BigInteger(1, arr); + } + + @Nonnull + private static > Optional lookupInMap( + @Nonnull final NavigableMap> map, + @Nonnull final T key) { + return Optional.ofNullable(map.floorEntry(key)) + .filter(e -> e.getValue().contains(key) && e.getValue().asnInfo().asn() != 0) + .map(e -> e.getValue().asnInfo()); + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/asn/AsnRange.java b/service/src/main/java/org/whispersystems/textsecuregcm/asn/AsnRange.java new file mode 100644 index 000000000..1b6cfaff8 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/asn/AsnRange.java @@ -0,0 +1,28 @@ +/* + * Copyright 2013-2021 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.asn; + +import static java.util.Objects.requireNonNull; + +import javax.annotation.Nonnull; +import org.apache.commons.lang3.Validate; + +public record AsnRange>(@Nonnull T from, + @Nonnull T to, + @Nonnull AsnInfo asnInfo) { + public AsnRange { + requireNonNull(from); + requireNonNull(to); + requireNonNull(asnInfo); + Validate.isTrue(from.compareTo(to) <= 0); + } + + boolean contains(@Nonnull final T element) { + requireNonNull(element); + return from.compareTo(element) <= 0 + && element.compareTo(to) <= 0; + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/s3/ManagedSupplier.java b/service/src/main/java/org/whispersystems/textsecuregcm/s3/ManagedSupplier.java new file mode 100644 index 000000000..20ed60318 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/s3/ManagedSupplier.java @@ -0,0 +1,22 @@ +/* + * Copyright 2013 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.s3; + +import io.dropwizard.lifecycle.Managed; +import java.util.function.Supplier; + +public interface ManagedSupplier extends Supplier, Managed { + + @Override + default void start() throws Exception { + // noop + } + + @Override + default void stop() throws Exception { + // noop + } +} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/s3/S3MonitoringSupplier.java b/service/src/main/java/org/whispersystems/textsecuregcm/s3/S3MonitoringSupplier.java new file mode 100644 index 000000000..aa829a613 --- /dev/null +++ b/service/src/main/java/org/whispersystems/textsecuregcm/s3/S3MonitoringSupplier.java @@ -0,0 +1,80 @@ +/* + * Copyright 2021 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.s3; + +import static java.util.Objects.requireNonNull; +import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Timer; +import java.io.InputStream; +import java.lang.invoke.MethodHandles; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.configuration.S3ObjectMonitorFactory; +import org.whispersystems.textsecuregcm.s3.ManagedSupplier; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; + +public class S3MonitoringSupplier implements ManagedSupplier { + + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private final Timer refreshTimer; + + private final Counter refreshErrors; + + private final AtomicReference holder; + + private final S3ObjectMonitor monitor; + + private final Function parser; + + + public S3MonitoringSupplier( + final ScheduledExecutorService executor, + final AwsCredentialsProvider awsCredentialsProvider, + final S3ObjectMonitorFactory cfg, + final Function parser, + final T initial, + final String name) { + this.refreshTimer = Metrics.timer(name(S3MonitoringSupplier.class, name, "refresh")); + this.refreshErrors = Metrics.counter(name(S3MonitoringSupplier.class, name, "refreshErrors")); + this.holder = new AtomicReference<>(initial); + this.parser = requireNonNull(parser); + this.monitor = cfg.build(awsCredentialsProvider, executor); + } + + @Override + public T get() { + return requireNonNull(holder.get()); + } + + @Override + public void start() throws Exception { + monitor.start(this::handleObjectChange); + } + + @Override + public void stop() throws Exception { + monitor.stop(); + } + + private void handleObjectChange(final InputStream inputStream) { + refreshTimer.record(() -> { + // parser function is supposed to close the input stream + try { + holder.set(parser.apply(inputStream)); + } catch (final Exception e) { + log.error("failed to update internal state from the monitored object", e); + refreshErrors.increment(); + } + }); + } +} diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/asn/AsnInfoProviderImplTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/asn/AsnInfoProviderImplTest.java new file mode 100644 index 000000000..a16cdc07b --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/asn/AsnInfoProviderImplTest.java @@ -0,0 +1,67 @@ +/* + * Copyright 2013-2021 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.textsecuregcm.asn; + +import static java.util.Objects.requireNonNull; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.whispersystems.textsecuregcm.asn.AsnInfoProviderImpl.ip4BytesToLong; +import static org.whispersystems.textsecuregcm.asn.AsnInfoProviderImpl.ip6BytesToBigInteger; + +import java.io.IOException; +import java.io.InputStream; +import java.math.BigInteger; +import java.net.Inet4Address; +import java.net.Inet6Address; +import java.net.InetAddress; +import org.junit.jupiter.api.Test; + +public class AsnInfoProviderImplTest { + + private static final String RESOURCE_NAME = "ip2asn-test.tsv"; + + @SuppressWarnings("OptionalGetWithoutIsPresent") + @Test + void testAsnInfo() throws IOException { + try (final InputStream tsvInputStream = getClass().getResourceAsStream(RESOURCE_NAME)) { + final AsnInfoProvider asnInfoProvider = AsnInfoProviderImpl.fromTsv(requireNonNull(tsvInputStream)); + assertEquals(16625L, asnInfoProvider.lookup("2.16.112.0").get().asn()); + assertEquals(16625L, asnInfoProvider.lookup("2.16.112.255").get().asn()); + assertEquals(16625L, asnInfoProvider.lookup("2.16.113.0").get().asn()); + assertEquals(16625L, asnInfoProvider.lookup("2.16.113.123").get().asn()); + assertEquals(16625L, asnInfoProvider.lookup("2.16.113.255").get().asn()); + + assertEquals("US", asnInfoProvider.lookup("2.16.113.255").get().regionCode()); + + assertEquals(4690L, asnInfoProvider.lookup("2001:200:e00::").get().asn()); + assertEquals(4690L, asnInfoProvider.lookup("2001:200:ef0::").get().asn()); + assertEquals(4690L, asnInfoProvider.lookup("2001:200:eff:ffff::").get().asn()); + assertEquals(4690L, asnInfoProvider.lookup("2001:200:eff:ffff:ffff:ffff:ffff:ffff").get().asn()); + + assertEquals("JP", asnInfoProvider.lookup("2001:200:eff:ffff:ffff:ffff:ffff:ffff").get().regionCode()); + + assertTrue(asnInfoProvider.lookup("1.3.0.0").isEmpty()); + assertTrue(asnInfoProvider.lookup("1.4.127.255").isEmpty()); + assertTrue(asnInfoProvider.lookup("2001:4:113::").isEmpty()); + assertTrue(asnInfoProvider.lookup("0.0.0.0").isEmpty()); + assertTrue(asnInfoProvider.lookup("127.0.0.1").isEmpty()); + assertTrue(asnInfoProvider.lookup("not an ip").isEmpty()); + } + } + + @Test + public void testBytesToLong() throws Exception { + assertEquals(0x00000000ffffffffL, ip4BytesToLong((Inet4Address) InetAddress.getByName("255.255.255.255"))); + assertEquals(0x0000000000000001L, ip4BytesToLong((Inet4Address) InetAddress.getByName("0.0.0.1"))); + assertEquals(0x00000000ff00ff01L, ip4BytesToLong((Inet4Address) InetAddress.getByName("255.0.255.1"))); + + final BigInteger start = ip6BytesToBigInteger((Inet6Address) InetAddress.getByName("2c0f:fff1:0:0:0:0:0:0")); + final BigInteger end = ip6BytesToBigInteger((Inet6Address) InetAddress.getByName("fdff:ffff:ffff:ffff:ffff:ffff:ffff:ffff")); + assertTrue(start.compareTo(BigInteger.ZERO) >= 0); + assertTrue(end.compareTo(BigInteger.ZERO) >= 0); + assertTrue(start.compareTo(end) <= 0); + } +} diff --git a/service/src/test/resources/config/test.yml b/service/src/test/resources/config/test.yml index ea6893f43..bfcd30405 100644 --- a/service/src/test/resources/config/test.yml +++ b/service/src/test/resources/config/test.yml @@ -538,3 +538,10 @@ idlePrimaryDeviceReminder: grpc: port: 50051 + +asnTable: + s3Region: a-region + s3Bucket: a-bucket + objectKey: asn.tsv + maxSize: 100000 + refreshInterval: PT10S diff --git a/service/src/test/resources/org/whispersystems/textsecuregcm/asn/ip2asn-test.tsv b/service/src/test/resources/org/whispersystems/textsecuregcm/asn/ip2asn-test.tsv new file mode 100644 index 000000000..c35258c11 --- /dev/null +++ b/service/src/test/resources/org/whispersystems/textsecuregcm/asn/ip2asn-test.tsv @@ -0,0 +1,6 @@ +1.3.0.0 1.4.127.255 0 None Not routed +1.0.0.0 1.0.0.255 13335 US CLOUDFLARENET +2.16.112.0 2.16.113.255 16625 US AKAMAI-AS +2001:4:113:: 2001:1ff:ffff:ffff:ffff:ffff:ffff:ffff 0 None Not routed +2001:200:e00:: 2001:200:eff:ffff:ffff:ffff:ffff:ffff 4690 JP WIDE-SFO WIDE Project +2001:420:4488:: 2001:420:c0ef:ffff:ffff:ffff:ffff:ffff 109 US CISCOSYSTEMS diff --git a/service/src/test/resources/org/whispersystems/textsecuregcm/util/ip2asn-test.tsv b/service/src/test/resources/org/whispersystems/textsecuregcm/util/ip2asn-test.tsv deleted file mode 100644 index 08c4ce2ba..000000000 --- a/service/src/test/resources/org/whispersystems/textsecuregcm/util/ip2asn-test.tsv +++ /dev/null @@ -1,3 +0,0 @@ -95865344 95865855 0 None Not routed -458051584 458227711 7552 VN VIETEL-AS-AP Viettel Group -843841536 844103679 7922 US COMCAST-7922 - Comcast Cable Communications, LLC