mirror of
https://github.com/signalapp/Signal-Android.git
synced 2026-02-22 18:55:12 +00:00
Add ability to do unused reads from CDSv2 to test server load.
This commit is contained in:
committed by
Cody Henthorne
parent
84717b95f7
commit
15e52a8b88
@@ -13,6 +13,7 @@ import org.signal.contacts.SystemContactsRepository.ContactIterator
|
||||
import org.signal.contacts.SystemContactsRepository.ContactPhoneDetails
|
||||
import org.signal.core.util.Stopwatch
|
||||
import org.signal.core.util.StringUtil
|
||||
import org.signal.core.util.concurrent.SignalExecutors
|
||||
import org.signal.core.util.logging.Log
|
||||
import org.thoughtcrime.securesms.BuildConfig
|
||||
import org.thoughtcrime.securesms.R
|
||||
@@ -36,7 +37,11 @@ import org.thoughtcrime.securesms.util.Util
|
||||
import org.whispersystems.signalservice.api.push.SignalServiceAddress
|
||||
import org.whispersystems.signalservice.api.util.UuidUtil
|
||||
import java.io.IOException
|
||||
import java.lang.Exception
|
||||
import java.util.Calendar
|
||||
import java.util.concurrent.Callable
|
||||
import java.util.concurrent.ExecutionException
|
||||
import java.util.concurrent.Future
|
||||
|
||||
/**
|
||||
* Methods for discovering which users are registered and marking them as such in the database.
|
||||
@@ -76,6 +81,8 @@ object ContactDiscovery {
|
||||
refresh = {
|
||||
if (FeatureFlags.phoneNumberPrivacy()) {
|
||||
ContactDiscoveryRefreshV2.refreshAll(context)
|
||||
} else if (FeatureFlags.cdsV2LoadTesting()) {
|
||||
loadTestRefreshAll(context)
|
||||
} else {
|
||||
ContactDiscoveryRefreshV1.refreshAll(context)
|
||||
}
|
||||
@@ -97,6 +104,8 @@ object ContactDiscovery {
|
||||
refresh = {
|
||||
if (FeatureFlags.phoneNumberPrivacy()) {
|
||||
ContactDiscoveryRefreshV2.refresh(context, recipients)
|
||||
} else if (FeatureFlags.cdsV2LoadTesting()) {
|
||||
loadTestRefresh(context, recipients)
|
||||
} else {
|
||||
ContactDiscoveryRefreshV1.refresh(context, recipients)
|
||||
}
|
||||
@@ -116,6 +125,8 @@ object ContactDiscovery {
|
||||
refresh = {
|
||||
if (FeatureFlags.phoneNumberPrivacy()) {
|
||||
ContactDiscoveryRefreshV2.refresh(context, listOf(recipient))
|
||||
} else if (FeatureFlags.cdsV2LoadTesting()) {
|
||||
loadTestRefresh(context, listOf(recipient))
|
||||
} else {
|
||||
ContactDiscoveryRefreshV1.refresh(context, listOf(recipient))
|
||||
}
|
||||
@@ -367,6 +378,38 @@ object ContactDiscovery {
|
||||
ApplicationDependencies.getProtocolStore().pni().containsSession(protocolAddress)
|
||||
}
|
||||
|
||||
private fun loadTestRefreshAll(context: Context): RefreshResult {
|
||||
return loadTestOperation(
|
||||
{ ContactDiscoveryRefreshV1.refreshAll(context) },
|
||||
{ ContactDiscoveryRefreshV2.refreshAll(context, ignoreResults = true) }
|
||||
)
|
||||
}
|
||||
|
||||
private fun loadTestRefresh(context: Context, recipients: List<Recipient>): RefreshResult {
|
||||
return loadTestOperation(
|
||||
{ ContactDiscoveryRefreshV1.refresh(context, recipients) },
|
||||
{ ContactDiscoveryRefreshV2.refresh(context, recipients, ignoreResults = true) }
|
||||
)
|
||||
}
|
||||
|
||||
private fun loadTestOperation(operationV1: Callable<RefreshResult>, operationV2: Callable<RefreshResult>): RefreshResult {
|
||||
val v1Future: Future<RefreshResult> = SignalExecutors.UNBOUNDED.submit(operationV1)
|
||||
val v2Future: Future<RefreshResult> = SignalExecutors.UNBOUNDED.submit(operationV2)
|
||||
|
||||
try {
|
||||
v2Future.get()
|
||||
} catch (e: Exception) {
|
||||
Log.w(TAG, "Failed to complete the V2 fetch!", e)
|
||||
}
|
||||
|
||||
try {
|
||||
return v1Future.get()
|
||||
} catch (e: ExecutionException) {
|
||||
Log.w(TAG, "Hit exception during V1 fetch!", e)
|
||||
throw e.cause!!
|
||||
}
|
||||
}
|
||||
|
||||
class RefreshResult(
|
||||
val registeredIds: Set<RecipientId>,
|
||||
val rewrites: Map<String, String>
|
||||
|
||||
@@ -62,7 +62,8 @@ import io.reactivex.rxjava3.schedulers.Schedulers;
|
||||
*/
|
||||
class ContactDiscoveryRefreshV1 {
|
||||
|
||||
private static final String TAG = Log.tag(ContactDiscoveryRefreshV1.class);
|
||||
// Using Log.tag will cut off the version number
|
||||
private static final String TAG = "CdsRefreshV1";
|
||||
|
||||
private static final int MAX_NUMBERS = 20_500;
|
||||
|
||||
|
||||
@@ -24,7 +24,8 @@ import java.util.Optional
|
||||
*/
|
||||
object ContactDiscoveryRefreshV2 {
|
||||
|
||||
private val TAG = Log.tag(ContactDiscoveryRefreshV2::class.java)
|
||||
// Using Log.tag will cut off the version number
|
||||
private const val TAG = "CdsRefreshV2"
|
||||
|
||||
/**
|
||||
* The maximum number items we will allow in a 'one-off' request.
|
||||
@@ -38,7 +39,7 @@ object ContactDiscoveryRefreshV2 {
|
||||
@WorkerThread
|
||||
@Synchronized
|
||||
@JvmStatic
|
||||
fun refreshAll(context: Context): ContactDiscovery.RefreshResult {
|
||||
fun refreshAll(context: Context, ignoreResults: Boolean = false): ContactDiscovery.RefreshResult {
|
||||
val stopwatch = Stopwatch("refresh-all")
|
||||
|
||||
val previousE164s: Set<String> = if (SignalStore.misc().cdsToken != null) {
|
||||
@@ -59,28 +60,45 @@ object ContactDiscoveryRefreshV2 {
|
||||
|
||||
val newE164s: Set<String> = newRecipientE164s + newSystemE164s
|
||||
|
||||
val tokenToUse: ByteArray? = if (previousE164s.isNotEmpty()) {
|
||||
SignalStore.misc().cdsToken
|
||||
} else {
|
||||
if (SignalStore.misc().cdsToken != null) {
|
||||
Log.w(TAG, "We have a token, but our previousE164 list is empty! We cannot provide a token.")
|
||||
}
|
||||
null
|
||||
}
|
||||
|
||||
val response: CdsiV2Service.Response = makeRequest(
|
||||
previousE164s = previousE164s,
|
||||
newE164s = newE164s,
|
||||
serviceIds = SignalDatabase.recipients.getAllServiceIdProfileKeyPairs(),
|
||||
token = SignalStore.misc().cdsToken,
|
||||
saveToken = true
|
||||
token = tokenToUse,
|
||||
saveToken = true,
|
||||
tag = "refresh-all"
|
||||
)
|
||||
stopwatch.split("network")
|
||||
|
||||
SignalDatabase.cds.updateAfterCdsQuery(newE164s, recipientE164s + systemE164s)
|
||||
stopwatch.split("cds-db")
|
||||
|
||||
val registeredIds: Set<RecipientId> = SignalDatabase.recipients.bulkProcessCdsV2Result(
|
||||
response.results
|
||||
.mapValues { entry -> RecipientDatabase.CdsV2Result(entry.value.pni, entry.value.aci.orElse(null)) }
|
||||
)
|
||||
stopwatch.split("recipient-db")
|
||||
var registeredIds: Set<RecipientId> = emptySet()
|
||||
|
||||
SignalDatabase.recipients.bulkUpdatedRegisteredStatus(registeredIds.associateWith { null }, emptyList())
|
||||
stopwatch.split("update-registered")
|
||||
if (ignoreResults) {
|
||||
Log.w(TAG, "[refresh-all] Ignoring CDSv2 results.")
|
||||
} else {
|
||||
registeredIds = SignalDatabase.recipients.bulkProcessCdsV2Result(
|
||||
response.results
|
||||
.mapValues { entry -> RecipientDatabase.CdsV2Result(entry.value.pni, entry.value.aci.orElse(null)) }
|
||||
)
|
||||
stopwatch.split("recipient-db")
|
||||
|
||||
SignalDatabase.recipients.bulkUpdatedRegisteredStatus(registeredIds.associateWith { null }, emptyList())
|
||||
stopwatch.split("update-registered")
|
||||
}
|
||||
|
||||
stopwatch.stop(TAG)
|
||||
Log.d(TAG, "[refresh-all] Used ${response.quotaUsedDebugOnly} units of our quota.")
|
||||
|
||||
return ContactDiscovery.RefreshResult(registeredIds, emptyMap())
|
||||
}
|
||||
@@ -89,7 +107,7 @@ object ContactDiscoveryRefreshV2 {
|
||||
@WorkerThread
|
||||
@Synchronized
|
||||
@JvmStatic
|
||||
fun refresh(context: Context, inputRecipients: List<Recipient>): ContactDiscovery.RefreshResult {
|
||||
fun refresh(context: Context, inputRecipients: List<Recipient>, ignoreResults: Boolean = false): ContactDiscovery.RefreshResult {
|
||||
val stopwatch = Stopwatch("refresh-some")
|
||||
|
||||
val recipients = inputRecipients.map { it.resolve() }
|
||||
@@ -100,7 +118,7 @@ object ContactDiscoveryRefreshV2 {
|
||||
|
||||
if (inputE164s.size > MAXIMUM_ONE_OFF_REQUEST_SIZE) {
|
||||
Log.i(TAG, "List of specific recipients to refresh is too large! (Size: ${recipients.size}). Doing a full refresh instead.")
|
||||
val fullResult: ContactDiscovery.RefreshResult = refreshAll(context)
|
||||
val fullResult: ContactDiscovery.RefreshResult = refreshAll(context, ignoreResults)
|
||||
|
||||
return ContactDiscovery.RefreshResult(
|
||||
registeredIds = fullResult.registeredIds.intersect(inputIds),
|
||||
@@ -120,35 +138,44 @@ object ContactDiscoveryRefreshV2 {
|
||||
newE164s = inputE164s,
|
||||
serviceIds = SignalDatabase.recipients.getAllServiceIdProfileKeyPairs(),
|
||||
token = null,
|
||||
saveToken = false
|
||||
saveToken = false,
|
||||
tag = "refresh-some"
|
||||
)
|
||||
stopwatch.split("network")
|
||||
|
||||
val registeredIds: Set<RecipientId> = SignalDatabase.recipients.bulkProcessCdsV2Result(
|
||||
response.results
|
||||
.mapValues { entry -> RecipientDatabase.CdsV2Result(entry.value.pni, entry.value.aci.orElse(null)) }
|
||||
)
|
||||
stopwatch.split("recipient-db")
|
||||
var registeredIds: Set<RecipientId> = emptySet()
|
||||
|
||||
SignalDatabase.recipients.bulkUpdatedRegisteredStatus(registeredIds.associateWith { null }, emptyList())
|
||||
stopwatch.split("update-registered")
|
||||
if (ignoreResults) {
|
||||
Log.w(TAG, "[refresh-some] Ignoring CDSv2 results.")
|
||||
} else {
|
||||
registeredIds = SignalDatabase.recipients.bulkProcessCdsV2Result(
|
||||
response.results
|
||||
.mapValues { entry -> RecipientDatabase.CdsV2Result(entry.value.pni, entry.value.aci.orElse(null)) }
|
||||
)
|
||||
stopwatch.split("recipient-db")
|
||||
|
||||
SignalDatabase.recipients.bulkUpdatedRegisteredStatus(registeredIds.associateWith { null }, emptyList())
|
||||
stopwatch.split("update-registered")
|
||||
}
|
||||
|
||||
Log.d(TAG, "[refresh-some] Used ${response.quotaUsedDebugOnly} units of our quota.")
|
||||
stopwatch.stop(TAG)
|
||||
|
||||
return ContactDiscovery.RefreshResult(registeredIds, emptyMap())
|
||||
}
|
||||
|
||||
@Throws(IOException::class)
|
||||
private fun makeRequest(previousE164s: Set<String>, newE164s: Set<String>, serviceIds: Map<ServiceId, ProfileKey>, token: ByteArray?, saveToken: Boolean): CdsiV2Service.Response {
|
||||
private fun makeRequest(previousE164s: Set<String>, newE164s: Set<String>, serviceIds: Map<ServiceId, ProfileKey>, token: ByteArray?, saveToken: Boolean, tag: String): CdsiV2Service.Response {
|
||||
return ApplicationDependencies.getSignalServiceAccountManager().getRegisteredUsersWithCdsi(
|
||||
previousE164s,
|
||||
newE164s,
|
||||
serviceIds,
|
||||
Optional.ofNullable(token),
|
||||
BuildConfig.CDSI_MRENCLAVE
|
||||
) { token ->
|
||||
) { tokenToSave ->
|
||||
if (saveToken) {
|
||||
SignalStore.misc().cdsToken = token
|
||||
SignalStore.misc().cdsToken = tokenToSave
|
||||
Log.d(TAG, "[$tag] Token saved!")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -101,6 +101,7 @@ public final class FeatureFlags {
|
||||
private static final String TELECOM_MODEL_BLOCKLIST = "android.calling.telecomModelBlockList";
|
||||
private static final String CAMERAX_MODEL_BLOCKLIST = "android.cameraXModelBlockList";
|
||||
private static final String RECIPIENT_MERGE_V2 = "android.recipientMergeV2";
|
||||
private static final String CDS_V2_LOAD_TEST = "android.csdV2LoadTest";
|
||||
|
||||
/**
|
||||
* We will only store remote values for flags in this set. If you want a flag to be controllable
|
||||
@@ -154,7 +155,8 @@ public final class FeatureFlags {
|
||||
TELECOM_MANUFACTURER_ALLOWLIST,
|
||||
TELECOM_MODEL_BLOCKLIST,
|
||||
CAMERAX_MODEL_BLOCKLIST,
|
||||
RECIPIENT_MERGE_V2
|
||||
RECIPIENT_MERGE_V2,
|
||||
CDS_V2_LOAD_TEST
|
||||
);
|
||||
|
||||
@VisibleForTesting
|
||||
@@ -217,7 +219,8 @@ public final class FeatureFlags {
|
||||
TELECOM_MANUFACTURER_ALLOWLIST,
|
||||
TELECOM_MODEL_BLOCKLIST,
|
||||
CAMERAX_MODEL_BLOCKLIST,
|
||||
RECIPIENT_MERGE_V2
|
||||
RECIPIENT_MERGE_V2,
|
||||
CDS_V2_LOAD_TEST
|
||||
);
|
||||
|
||||
/**
|
||||
@@ -545,6 +548,13 @@ public final class FeatureFlags {
|
||||
return getBoolean(RECIPIENT_MERGE_V2, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether or not we should also query CDSv2 as a form of load test.
|
||||
*/
|
||||
public static boolean cdsV2LoadTesting() {
|
||||
return getBoolean(CDS_V2_LOAD_TEST, false);
|
||||
}
|
||||
|
||||
/** Only for rendering debug info. */
|
||||
public static synchronized @NonNull Map<String, Object> getMemoryValues() {
|
||||
return new TreeMap<>(REMOTE_VALUES);
|
||||
|
||||
Reference in New Issue
Block a user