Add support for doing normal CDS queries on CDSv2.

This commit is contained in:
Greyson Parrelli
2022-08-31 15:38:22 -04:00
parent 9b17e7a7e2
commit 2eba9a8d72
10 changed files with 274 additions and 156 deletions

View File

@@ -80,7 +80,9 @@ object ContactDiscovery {
descriptor = "refresh-all",
refresh = {
if (FeatureFlags.phoneNumberPrivacy()) {
ContactDiscoveryRefreshV2.refreshAll(context)
ContactDiscoveryRefreshV2.refreshAll(context, useCompat = false)
} else if (FeatureFlags.cdsV2Compat()) {
ContactDiscoveryRefreshV2.refreshAll(context, useCompat = true)
} else if (FeatureFlags.cdsV2LoadTesting()) {
loadTestRefreshAll(context)
} else {
@@ -103,7 +105,9 @@ object ContactDiscovery {
descriptor = "refresh-multiple",
refresh = {
if (FeatureFlags.phoneNumberPrivacy()) {
ContactDiscoveryRefreshV2.refresh(context, recipients)
ContactDiscoveryRefreshV2.refresh(context, recipients, useCompat = false)
} else if (FeatureFlags.cdsV2Compat()) {
ContactDiscoveryRefreshV2.refresh(context, recipients, useCompat = true)
} else if (FeatureFlags.cdsV2LoadTesting()) {
loadTestRefresh(context, recipients)
} else {
@@ -124,7 +128,9 @@ object ContactDiscovery {
descriptor = "refresh-single",
refresh = {
if (FeatureFlags.phoneNumberPrivacy()) {
ContactDiscoveryRefreshV2.refresh(context, listOf(recipient))
ContactDiscoveryRefreshV2.refresh(context, listOf(recipient), useCompat = false)
} else if (FeatureFlags.cdsV2Compat()) {
ContactDiscoveryRefreshV2.refresh(context, listOf(recipient), useCompat = true)
} else if (FeatureFlags.cdsV2LoadTesting()) {
loadTestRefresh(context, listOf(recipient))
} else {
@@ -381,14 +387,14 @@ object ContactDiscovery {
private fun loadTestRefreshAll(context: Context): RefreshResult {
return loadTestOperation(
{ ContactDiscoveryRefreshV1.refreshAll(context) },
{ ContactDiscoveryRefreshV2.refreshAll(context, ignoreResults = true) }
{ ContactDiscoveryRefreshV2.refreshAll(context, useCompat = false, ignoreResults = true) }
)
}
private fun loadTestRefresh(context: Context, recipients: List<Recipient>): RefreshResult {
return loadTestOperation(
{ ContactDiscoveryRefreshV1.refresh(context, recipients) },
{ ContactDiscoveryRefreshV2.refresh(context, recipients, ignoreResults = true) }
{ ContactDiscoveryRefreshV2.refresh(context, recipients, useCompat = false, ignoreResults = true) }
)
}

View File

@@ -122,7 +122,7 @@ class ContactDiscoveryRefreshV1 {
if (result.getNumberRewrites().size() > 0) {
Log.i(TAG, "[getDirectoryResult] Need to rewrite some numbers.");
recipientDatabase.updatePhoneNumbers(result.getNumberRewrites());
recipientDatabase.rewritePhoneNumbers(result.getNumberRewrites());
}
Map<RecipientId, ACI> aciMap = recipientDatabase.bulkProcessCdsResult(result.getRegisteredNumbers());
@@ -250,8 +250,8 @@ class ContactDiscoveryRefreshV1 {
KeyStore iasKeyStore = getIasKeyStore(context);
try {
Map<String, ACI> results = accountManager.getRegisteredUsers(iasKeyStore, sanitizedNumbers, BuildConfig.CDS_MRENCLAVE);
FuzzyPhoneNumberHelper.OutputResult outputResult = FuzzyPhoneNumberHelper.generateOutput(results, inputResult);
Map<String, ACI> results = accountManager.getRegisteredUsers(iasKeyStore, sanitizedNumbers, BuildConfig.CDS_MRENCLAVE);
FuzzyPhoneNumberHelper.OutputResult<ACI> outputResult = FuzzyPhoneNumberHelper.generateOutput(results, inputResult);
return new ContactIntersection(outputResult.getNumbers(), outputResult.getRewrites(), ignoredNumbers);
} catch (SignatureException | UnauthenticatedQuoteException | UnauthenticatedResponseException | Quote.InvalidQuoteFormatException | InvalidKeyException e) {

View File

@@ -4,20 +4,25 @@ import android.content.Context
import androidx.annotation.WorkerThread
import org.signal.contacts.SystemContactsRepository
import org.signal.core.util.Stopwatch
import org.signal.core.util.concurrent.SignalExecutors
import org.signal.core.util.logging.Log
import org.signal.libsignal.zkgroup.profiles.ProfileKey
import org.thoughtcrime.securesms.BuildConfig
import org.thoughtcrime.securesms.database.RecipientDatabase
import org.thoughtcrime.securesms.contacts.sync.FuzzyPhoneNumberHelper.InputResult
import org.thoughtcrime.securesms.contacts.sync.FuzzyPhoneNumberHelper.OutputResult
import org.thoughtcrime.securesms.database.RecipientDatabase.CdsV2Result
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies
import org.thoughtcrime.securesms.jobs.RetrieveProfileJob
import org.thoughtcrime.securesms.keyvalue.SignalStore
import org.thoughtcrime.securesms.phonenumbers.PhoneNumberFormatter
import org.thoughtcrime.securesms.recipients.Recipient
import org.thoughtcrime.securesms.recipients.RecipientId
import org.whispersystems.signalservice.api.push.ServiceId
import org.whispersystems.signalservice.api.push.ACI
import org.whispersystems.signalservice.api.services.CdsiV2Service
import java.io.IOException
import java.util.Optional
import java.util.concurrent.Callable
import java.util.concurrent.Future
/**
* Performs the CDS refresh using the V2 interface (either CDSH or CDSI) that returns both PNIs and ACIs.
@@ -39,149 +44,190 @@ object ContactDiscoveryRefreshV2 {
@WorkerThread
@Synchronized
@JvmStatic
fun refreshAll(context: Context, ignoreResults: Boolean = false): ContactDiscovery.RefreshResult {
val stopwatch = Stopwatch("refresh-all")
val previousE164s: Set<String> = if (SignalStore.misc().cdsToken != null) {
SignalDatabase.cds.getAllE164s()
} else {
Log.w(TAG, "No token set! Cannot provide previousE164s.")
emptySet()
}
stopwatch.split("previous")
fun refreshAll(context: Context, useCompat: Boolean, ignoreResults: Boolean = false): ContactDiscovery.RefreshResult {
val recipientE164s: Set<String> = SignalDatabase.recipients.getAllE164s().sanitize()
val newRecipientE164s: Set<String> = recipientE164s - previousE164s
stopwatch.split("recipient")
val systemE164s: Set<String> = SystemContactsRepository.getAllDisplayNumbers(context).toE164s(context).sanitize()
val newSystemE164s: Set<String> = systemE164s - previousE164s
stopwatch.split("system")
val newE164s: Set<String> = newRecipientE164s + newSystemE164s
if (newE164s.isEmpty() && previousE164s.isEmpty()) {
return ContactDiscovery.RefreshResult(emptySet(), emptyMap())
}
val tokenToUse: ByteArray? = if (previousE164s.isNotEmpty()) {
SignalStore.misc().cdsToken
} else {
if (SignalStore.misc().cdsToken != null) {
Log.w(TAG, "We have a token, but our previousE164 list is empty! We cannot provide a token.")
}
null
}
val response: CdsiV2Service.Response = makeRequest(
previousE164s = previousE164s,
newE164s = newE164s,
serviceIds = SignalDatabase.recipients.getAllServiceIdProfileKeyPairs(),
token = tokenToUse,
return refreshInternal(
recipientE164s = recipientE164s,
systemE164s = systemE164s,
inputPreviousE164s = SignalDatabase.cds.getAllE164s(),
saveToken = true,
tag = "refresh-all"
useCompat = useCompat,
ignoreResults = ignoreResults
)
stopwatch.split("network")
SignalDatabase.cds.updateAfterCdsQuery(newE164s, recipientE164s + systemE164s)
stopwatch.split("cds-db")
var registeredIds: Set<RecipientId> = emptySet()
if (ignoreResults) {
Log.w(TAG, "[refresh-all] Ignoring CDSv2 results.")
} else {
registeredIds = SignalDatabase.recipients.bulkProcessCdsV2Result(
response.results
.mapValues { entry -> RecipientDatabase.CdsV2Result(entry.value.pni, entry.value.aci.orElse(null)) }
)
stopwatch.split("recipient-db")
SignalDatabase.recipients.bulkUpdatedRegisteredStatus(registeredIds.associateWith { null }, emptyList())
stopwatch.split("update-registered")
}
stopwatch.stop(TAG)
Log.d(TAG, "[refresh-all] Used ${response.quotaUsedDebugOnly} units of our quota.")
return ContactDiscovery.RefreshResult(registeredIds, emptyMap())
}
@Throws(IOException::class)
@WorkerThread
@Synchronized
@JvmStatic
fun refresh(context: Context, inputRecipients: List<Recipient>, ignoreResults: Boolean = false): ContactDiscovery.RefreshResult {
val stopwatch = Stopwatch("refresh-some")
val recipients = inputRecipients.map { it.resolve() }
stopwatch.split("resolve")
val inputIds: Set<RecipientId> = recipients.map { it.id }.toSet()
fun refresh(context: Context, inputRecipients: List<Recipient>, useCompat: Boolean, ignoreResults: Boolean = false): ContactDiscovery.RefreshResult {
val recipients: List<Recipient> = inputRecipients.map { it.resolve() }
val inputE164s: Set<String> = recipients.mapNotNull { it.e164.orElse(null) }.toSet()
if (inputE164s.size > MAXIMUM_ONE_OFF_REQUEST_SIZE) {
return if (inputE164s.size > MAXIMUM_ONE_OFF_REQUEST_SIZE) {
Log.i(TAG, "List of specific recipients to refresh is too large! (Size: ${recipients.size}). Doing a full refresh instead.")
val fullResult: ContactDiscovery.RefreshResult = refreshAll(context, ignoreResults)
return ContactDiscovery.RefreshResult(
val fullResult: ContactDiscovery.RefreshResult = refreshAll(context, ignoreResults)
val inputIds: Set<RecipientId> = recipients.map { it.id }.toSet()
ContactDiscovery.RefreshResult(
registeredIds = fullResult.registeredIds.intersect(inputIds),
rewrites = fullResult.rewrites.filterKeys { inputE164s.contains(it) }
)
}
if (inputE164s.isEmpty()) {
Log.w(TAG, "No numbers to refresh!")
return ContactDiscovery.RefreshResult(emptySet(), emptyMap())
} else {
Log.i(TAG, "Doing a one-off request for ${inputE164s.size} recipients.")
}
val response: CdsiV2Service.Response = makeRequest(
previousE164s = emptySet(),
newE164s = inputE164s,
serviceIds = SignalDatabase.recipients.getAllServiceIdProfileKeyPairs(),
token = null,
saveToken = false,
tag = "refresh-some"
)
stopwatch.split("network")
var registeredIds: Set<RecipientId> = emptySet()
if (ignoreResults) {
Log.w(TAG, "[refresh-some] Ignoring CDSv2 results.")
} else {
registeredIds = SignalDatabase.recipients.bulkProcessCdsV2Result(
response.results
.mapValues { entry -> RecipientDatabase.CdsV2Result(entry.value.pni, entry.value.aci.orElse(null)) }
refreshInternal(
recipientE164s = inputE164s,
systemE164s = inputE164s,
inputPreviousE164s = emptySet(),
saveToken = false,
useCompat = useCompat,
ignoreResults = ignoreResults
)
stopwatch.split("recipient-db")
SignalDatabase.recipients.bulkUpdatedRegisteredStatus(registeredIds.associateWith { null }, emptyList())
stopwatch.split("update-registered")
}
Log.d(TAG, "[refresh-some] Used ${response.quotaUsedDebugOnly} units of our quota.")
stopwatch.stop(TAG)
return ContactDiscovery.RefreshResult(registeredIds, emptyMap())
}
@Throws(IOException::class)
private fun makeRequest(previousE164s: Set<String>, newE164s: Set<String>, serviceIds: Map<ServiceId, ProfileKey>, token: ByteArray?, saveToken: Boolean, tag: String): CdsiV2Service.Response {
return ApplicationDependencies.getSignalServiceAccountManager().getRegisteredUsersWithCdsi(
private fun refreshInternal(
recipientE164s: Set<String>,
systemE164s: Set<String>,
inputPreviousE164s: Set<String>,
saveToken: Boolean,
useCompat: Boolean,
ignoreResults: Boolean
): ContactDiscovery.RefreshResult {
val stopwatch = Stopwatch("refreshInternal-${if (useCompat) "compat" else "v2"}")
val previousE164s: Set<String> = if (SignalStore.misc().cdsToken != null) inputPreviousE164s else emptySet()
val allE164s: Set<String> = recipientE164s + systemE164s
val newRawE164s: Set<String> = allE164s - previousE164s
val fuzzyInput: InputResult = FuzzyPhoneNumberHelper.generateInput(newRawE164s, recipientE164s)
val newE164s: Set<String> = fuzzyInput.numbers
if (newE164s.isEmpty() && previousE164s.isEmpty()) {
Log.w(TAG, "[refreshInternal] No data to send! Ignoring.")
return ContactDiscovery.RefreshResult(emptySet(), emptyMap())
}
val token: ByteArray? = if (previousE164s.isNotEmpty()) SignalStore.misc().cdsToken else null
stopwatch.split("preamble")
val response: CdsiV2Service.Response = ApplicationDependencies.getSignalServiceAccountManager().getRegisteredUsersWithCdsi(
previousE164s,
newE164s,
serviceIds,
SignalDatabase.recipients.getAllServiceIdProfileKeyPairs(),
useCompat,
Optional.ofNullable(token),
BuildConfig.CDSI_MRENCLAVE
) { tokenToSave ->
if (saveToken) {
SignalStore.misc().cdsToken = tokenToSave
Log.d(TAG, "[$tag] Token saved!")
Log.d(TAG, "Token saved!")
} else {
Log.d(TAG, "Ignoring token.")
}
}
Log.d(TAG, "[refreshInternal] Used ${response.quotaUsedDebugOnly} quota.")
stopwatch.split("network")
SignalDatabase.cds.updateAfterCdsQuery(newE164s, allE164s + newE164s)
stopwatch.split("cds-db")
val registeredIds: MutableSet<RecipientId> = mutableSetOf()
val rewrites: MutableMap<String, String> = mutableMapOf()
if (ignoreResults) {
Log.w(TAG, "[refreshInternal] Ignoring CDSv2 results.")
} else {
if (useCompat) {
val transformed: Map<String, ACI?> = response.results.mapValues { entry -> entry.value.aci.orElse(null) }
val fuzzyOutput: OutputResult<ACI> = FuzzyPhoneNumberHelper.generateOutput(transformed, fuzzyInput)
if (transformed.values.any { it == null }) {
throw IOException("Unexpected null ACI!")
}
SignalDatabase.recipients.rewritePhoneNumbers(fuzzyOutput.rewrites)
stopwatch.split("rewrite-e164")
val aciMap: Map<RecipientId, ACI?> = SignalDatabase.recipients.bulkProcessCdsResult(fuzzyOutput.numbers)
registeredIds += aciMap.keys
rewrites += fuzzyOutput.rewrites
stopwatch.split("process-result")
val existingIds: Set<RecipientId> = SignalDatabase.recipients.getAllPossiblyRegisteredByE164(recipientE164s + rewrites.values)
val inactiveIds: Set<RecipientId> = (existingIds - registeredIds).removeRegisteredButUnlisted()
SignalDatabase.recipients.bulkUpdatedRegisteredStatus(aciMap, inactiveIds)
stopwatch.split("update-registered")
} else {
val transformed: Map<String, CdsV2Result> = response.results.mapValues { entry -> CdsV2Result(entry.value.pni, entry.value.aci.orElse(null)) }
val fuzzyOutput: OutputResult<CdsV2Result> = FuzzyPhoneNumberHelper.generateOutput(transformed, fuzzyInput)
SignalDatabase.recipients.rewritePhoneNumbers(fuzzyOutput.rewrites)
stopwatch.split("rewrite-e164")
val existingIds: Set<RecipientId> = SignalDatabase.recipients.getAllPossiblyRegisteredByE164(recipientE164s + rewrites.values)
val inactiveIds: Set<RecipientId> = (existingIds - registeredIds).removeRegisteredButUnlisted()
registeredIds += SignalDatabase.recipients.bulkProcessCdsV2Result(fuzzyOutput.numbers)
rewrites += fuzzyOutput.rewrites
stopwatch.split("process-result")
SignalDatabase.recipients.bulkUpdatedRegisteredStatusV2(registeredIds, inactiveIds)
stopwatch.split("update-registered")
}
}
stopwatch.stop(TAG)
return ContactDiscovery.RefreshResult(registeredIds, rewrites)
}
private fun hasCommunicatedWith(recipient: Recipient): Boolean {
val localAci = SignalStore.account().requireAci()
return SignalDatabase.threads.hasThread(recipient.id) || (recipient.hasServiceId() && SignalDatabase.sessions.hasSessionFor(localAci, recipient.requireServiceId().toString()))
}
@WorkerThread
private fun Set<RecipientId>.removeRegisteredButUnlisted(): Set<RecipientId> {
val futures: List<Future<Pair<RecipientId, Boolean?>>> = Recipient.resolvedList(this)
.filter { hasCommunicatedWith(it) }
.map {
SignalExecutors.UNBOUNDED.submit(
Callable {
try {
it.id to ApplicationDependencies.getSignalServiceAccountManager().isIdentifierRegistered(it.requireServiceId())
} catch (e: IOException) {
it.id to null
}
}
)
}
val registeredIds: MutableSet<RecipientId> = mutableSetOf()
val retryIds: MutableSet<RecipientId> = mutableSetOf()
for (future in futures) {
val (id, registered) = future.get()
if (registered == null) {
retryIds += id
registeredIds += id
} else if (registered) {
registeredIds += id
}
}
if (retryIds.isNotEmpty()) {
Log.w(TAG, "Failed to determine registered status of ${retryIds.size} recipients. Assuming registered, but enqueuing profile jobs to check later.")
RetrieveProfileJob.enqueue(retryIds)
}
return this - registeredIds
}
private fun Set<String>.toE164s(context: Context): Set<String> {

View File

@@ -51,8 +51,8 @@ class FuzzyPhoneNumberHelper {
* these results and our initial input set, we can decide if we need to rewrite which number we
* have stored locally.
*/
static @NonNull OutputResult generateOutput(@NonNull Map<String, ACI> registeredNumbers, @NonNull InputResult inputResult) {
Map<String, ACI> allNumbers = new HashMap<>(registeredNumbers);
static @NonNull <E> OutputResult<E> generateOutput(@NonNull Map<String, E> registeredNumbers, @NonNull InputResult inputResult) {
Map<String, E> allNumbers = new HashMap<>(registeredNumbers);
Map<String, String> rewrites = new HashMap<>();
for (Map.Entry<String, String> entry : inputResult.getMapOfOriginalToVariant().entrySet()) {
@@ -76,7 +76,7 @@ class FuzzyPhoneNumberHelper {
}
}
return new OutputResult(allNumbers, rewrites);
return new OutputResult<>(allNumbers, rewrites);
}
private interface FuzzyMatcher {
@@ -170,16 +170,16 @@ class FuzzyPhoneNumberHelper {
}
}
public static class OutputResult {
private final Map<String, ACI> numbers;
public static class OutputResult<E> {
private final Map<String, E> numbers;
private final Map<String, String> rewrites;
private OutputResult(@NonNull Map<String, ACI> numbers, @NonNull Map<String, String> rewrites) {
private OutputResult(@NonNull Map<String, E> numbers, @NonNull Map<String, String> rewrites) {
this.numbers = numbers;
this.rewrites = rewrites;
}
public @NonNull Map<String, ACI> getNumbers() {
public @NonNull Map<String, E> getNumbers() {
return numbers;
}