diff --git a/app/src/androidTest/java/org/thoughtcrime/securesms/testing/AliceClient.kt b/app/src/androidTest/java/org/thoughtcrime/securesms/testing/AliceClient.kt index 8eaf4ab865..04d09283de 100644 --- a/app/src/androidTest/java/org/thoughtcrime/securesms/testing/AliceClient.kt +++ b/app/src/androidTest/java/org/thoughtcrime/securesms/testing/AliceClient.kt @@ -40,7 +40,7 @@ class AliceClient(val serviceId: ServiceId, val e164: String, val trustRoot: ECK ApplicationDependencies.getIncomingMessageObserver() .processEnvelope(bufferedStore, envelope, serverDeliveredTimestamp) ?.mapNotNull { it.run() } - ?.forEach { ApplicationDependencies.getJobManager().add(it) } + ?.forEach { it.enqueue() } bufferedStore.flushToDisk() val end = System.currentTimeMillis() diff --git a/app/src/main/java/org/thoughtcrime/securesms/components/settings/app/internal/InternalSettingsFragment.kt b/app/src/main/java/org/thoughtcrime/securesms/components/settings/app/internal/InternalSettingsFragment.kt index e57e429c83..3f97a23cab 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/components/settings/app/internal/InternalSettingsFragment.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/components/settings/app/internal/InternalSettingsFragment.kt @@ -186,6 +186,48 @@ class InternalSettingsFragment : DSLSettingsFragment(R.string.preferences__inter } ) + clickPref( + title = DSLSettingsText.from("Log dump PreKey ServiceId-KeyIds"), + onClick = { + logPreKeyIds() + } + ) + + clickPref( + title = DSLSettingsText.from("Retry all jobs now"), + summary = DSLSettingsText.from("Clear backoff intervals, app will restart"), + onClick = { + SimpleTask.run({ + JobDatabase.getInstance(ApplicationDependencies.getApplication()).debugResetBackoffInterval() + }) { + AppUtil.restart(requireContext()) + } + } + ) + + clickPref( + title = DSLSettingsText.from("Delete all prekeys"), + summary = DSLSettingsText.from("Deletes all signed/last-resort/one-time prekeys for both ACI and PNI accounts. WILL cause problems."), + onClick = { + MaterialAlertDialogBuilder(requireContext()) + .setTitle("Delete all prekeys?") + .setMessage("Are you sure? This will delete all prekeys for both ACI and PNI accounts. This WILL cause problems.") + .setPositiveButton(android.R.string.ok) { _, _ -> + SignalDatabase.signedPreKeys.debugDeleteAll() + SignalDatabase.oneTimePreKeys.debugDeleteAll() + SignalDatabase.kyberPreKeys.debugDeleteAll() + + Toast.makeText(requireContext(), "All prekeys deleted!", Toast.LENGTH_SHORT).show() + } + .setNegativeButton(android.R.string.cancel, null) + .show() + } + ) + + dividerPref() + + sectionHeaderPref(DSLSettingsText.from("Logging")) + clickPref( title = DSLSettingsText.from("Clear all logs"), onClick = { @@ -227,21 +269,10 @@ class InternalSettingsFragment : DSLSettingsFragment(R.string.preferences__inter ) clickPref( - title = DSLSettingsText.from("Log dump PreKey ServiceId-KeyIds"), + title = DSLSettingsText.from("Clear local metrics"), + summary = DSLSettingsText.from("Click to clear all local metrics state."), onClick = { - logPreKeyIds() - } - ) - - clickPref( - title = DSLSettingsText.from("Retry all jobs now"), - summary = DSLSettingsText.from("Clear backoff intervals, app will restart"), - onClick = { - SimpleTask.run({ - JobDatabase.getInstance(ApplicationDependencies.getApplication()).debugResetBackoffInterval() - }) { - AppUtil.restart(requireContext()) - } + clearAllLocalMetricsState() } ) @@ -436,18 +467,6 @@ class InternalSettingsFragment : DSLSettingsFragment(R.string.preferences__inter dividerPref() - sectionHeaderPref(DSLSettingsText.from("Local Metrics")) - - clickPref( - title = DSLSettingsText.from("Clear local metrics"), - summary = DSLSettingsText.from("Click to clear all local metrics state."), - onClick = { - clearAllLocalMetricsState() - } - ) - - dividerPref() - sectionHeaderPref(DSLSettingsText.from("Group call server")) radioPref( diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/KyberPreKeyTable.kt b/app/src/main/java/org/thoughtcrime/securesms/database/KyberPreKeyTable.kt index 558ba3e241..bec3b247dc 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/KyberPreKeyTable.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/KyberPreKeyTable.kt @@ -2,6 +2,7 @@ package org.thoughtcrime.securesms.database import android.content.Context import org.signal.core.util.delete +import org.signal.core.util.deleteAll import org.signal.core.util.exists import org.signal.core.util.insertInto import org.signal.core.util.logging.Log @@ -171,6 +172,10 @@ class KyberPreKeyTable(context: Context, databaseHelper: SignalDatabase) : Datab Log.i(TAG, "Deleted $count stale one-time EC prekeys.") } + fun debugDeleteAll() { + writableDatabase.deleteAll(OneTimePreKeyTable.TABLE_NAME) + } + data class KyberPreKey( val record: KyberPreKeyRecord, val lastResort: Boolean diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/OneTimePreKeyTable.kt b/app/src/main/java/org/thoughtcrime/securesms/database/OneTimePreKeyTable.kt index 1cac549f05..1cf59d43ec 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/OneTimePreKeyTable.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/OneTimePreKeyTable.kt @@ -5,6 +5,7 @@ import androidx.core.content.contentValuesOf import org.signal.core.util.Base64 import org.signal.core.util.SqlUtil import org.signal.core.util.delete +import org.signal.core.util.deleteAll import org.signal.core.util.logging.Log import org.signal.core.util.requireNonNullString import org.signal.core.util.update @@ -115,6 +116,10 @@ class OneTimePreKeyTable(context: Context, databaseHelper: SignalDatabase) : Dat Log.i(TAG, "Deleted $count stale one-time EC prekeys.") } + fun debugDeleteAll() { + writableDatabase.deleteAll(TABLE_NAME) + } + private fun ServiceId.toAccountId(): String { return when (this) { is ServiceId.ACI -> this.toString() diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/SignedPreKeyTable.kt b/app/src/main/java/org/thoughtcrime/securesms/database/SignedPreKeyTable.kt index 3396b316dc..7679d26737 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/SignedPreKeyTable.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/SignedPreKeyTable.kt @@ -4,6 +4,7 @@ import android.content.Context import androidx.core.content.contentValuesOf import org.signal.core.util.Base64 import org.signal.core.util.SqlUtil +import org.signal.core.util.deleteAll import org.signal.core.util.logging.Log import org.signal.core.util.requireInt import org.signal.core.util.requireLong @@ -103,6 +104,10 @@ class SignedPreKeyTable(context: Context, databaseHelper: SignalDatabase) : Data writableDatabase.delete(TABLE_NAME, "$ACCOUNT_ID = ? AND $KEY_ID = ?", SqlUtil.buildArgs(serviceId.toAccountId(), keyId)) } + fun debugDeleteAll() { + writableDatabase.deleteAll(OneTimePreKeyTable.TABLE_NAME) + } + private fun ServiceId.toAccountId(): String { return when (this) { is ServiceId.ACI -> this.toString() diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java index 73a068e546..e0e2bb5e90 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java @@ -76,6 +76,15 @@ class JobController { notifyAll(); } + @WorkerThread + void submitNewJobChains(@NonNull List>> chains) { + synchronized (this) { + for (List> chain : chains) { + submitNewJobChain(chain); + } + } + } + @WorkerThread void submitNewJobChain(@NonNull List> chain) { synchronized (this) { diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java index 834ebe45e2..ffa70fbae0 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java @@ -35,6 +35,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; +import java.util.stream.Collectors; /** * Allows the scheduling of durable jobs that will be run as early as possible. @@ -208,6 +209,24 @@ public class JobManager implements ConstraintObserver.Notifier { }); } + public void addAllChains(@NonNull List chains) { + if (chains.isEmpty()) { + return; + } + + for (Chain chain : chains) { + for (List jobList : chain.getJobListChain()) { + for (Job job : jobList) { + jobTracker.onStateChange(job, JobTracker.JobState.PENDING); + } + } + } + + runOnExecutor(() -> { + jobController.submitNewJobChains(chains.stream().map(Chain::getJobListChain).collect(Collectors.toList())); + }); + } + /** * Begins the creation of a job chain with a single job. * @see Chain diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/PreKeysSyncJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/PreKeysSyncJob.kt index 3f5f60bc67..f3b51d7a59 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/PreKeysSyncJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/PreKeysSyncJob.kt @@ -2,6 +2,7 @@ package org.thoughtcrime.securesms.jobs import androidx.annotation.VisibleForTesting import org.signal.core.util.logging.Log +import org.signal.core.util.roundedString import org.signal.libsignal.protocol.state.KyberPreKeyRecord import org.signal.libsignal.protocol.state.PreKeyRecord import org.signal.libsignal.protocol.state.SignalProtocolStore @@ -11,7 +12,9 @@ import org.thoughtcrime.securesms.crypto.storage.PreKeyMetadataStore import org.thoughtcrime.securesms.dependencies.ApplicationDependencies import org.thoughtcrime.securesms.jobmanager.Job import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint +import org.thoughtcrime.securesms.jobs.protos.PreKeysSyncJobData import org.thoughtcrime.securesms.keyvalue.SignalStore +import org.thoughtcrime.securesms.util.FeatureFlags import org.whispersystems.signalservice.api.SignalServiceAccountDataStore import org.whispersystems.signalservice.api.account.PreKeyUpload import org.whispersystems.signalservice.api.push.ServiceId @@ -34,7 +37,10 @@ import kotlin.time.DurationUnit * It will also rotate/create last-resort kyber prekeys for both ACI and PNI identities, as well as ensure * that the user has a sufficient number of one-time kyber prekeys available on the service. */ -class PreKeysSyncJob private constructor(parameters: Parameters) : BaseJob(parameters) { +class PreKeysSyncJob private constructor( + parameters: Parameters, + private val forceRotationRequested: Boolean +) : BaseJob(parameters) { companion object { const val KEY = "PreKeysSyncJob" @@ -52,9 +58,13 @@ class PreKeysSyncJob private constructor(parameters: Parameters) : BaseJob(param @JvmField val MAXIMUM_ALLOWED_SIGNED_PREKEY_AGE = 14.days.inWholeMilliseconds + /** + * @param forceRotationRequested If true, this will force the rotation of all keys, provided we haven't already done a forced refresh recently. + */ + @JvmOverloads @JvmStatic - fun create(): PreKeysSyncJob { - return PreKeysSyncJob() + fun create(forceRotationRequested: Boolean = false): PreKeysSyncJob { + return PreKeysSyncJob(forceRotationRequested) } @JvmStatic @@ -87,19 +97,22 @@ class PreKeysSyncJob private constructor(parameters: Parameters) : BaseJob(param } @VisibleForTesting(otherwise = VisibleForTesting.PRIVATE) - constructor() : this( + constructor(forceRotation: Boolean = false) : this( Parameters.Builder() .setQueue("PreKeysSyncJob") .addConstraint(NetworkConstraint.KEY) .setMaxInstancesForFactory(1) .setMaxAttempts(Parameters.UNLIMITED) .setLifespan(TimeUnit.DAYS.toMillis(30)) - .build() + .build(), + forceRotation ) override fun getFactoryKey(): String = KEY - override fun serialize(): ByteArray? = null + override fun serialize(): ByteArray { + return PreKeysSyncJobData(forceRotationRequested).encode() + } override fun onRun() { if (!SignalStore.account().isRegistered || SignalStore.account().aci == null || SignalStore.account().pni == null) { @@ -107,12 +120,30 @@ class PreKeysSyncJob private constructor(parameters: Parameters) : BaseJob(param return } - syncPreKeys(ServiceIdType.ACI, SignalStore.account().aci, ApplicationDependencies.getProtocolStore().aci(), SignalStore.account().aciPreKeys) - syncPreKeys(ServiceIdType.PNI, SignalStore.account().pni, ApplicationDependencies.getProtocolStore().pni(), SignalStore.account().pniPreKeys) + val forceRotation = if (forceRotationRequested) { + val timeSinceLastForcedRotation = System.currentTimeMillis() - SignalStore.misc().lastForcedPreKeyRefresh + // We check < 0 in case someone changed their clock and had a bad value set + timeSinceLastForcedRotation > FeatureFlags.preKeyForceRefreshInterval() || timeSinceLastForcedRotation < 0 + } else { + false + } + + if (forceRotation) { + warn(TAG, "Forcing prekey rotation.") + } else if (forceRotationRequested) { + warn(TAG, "Forced prekey rotation was requested, but we already did a forced refresh ${System.currentTimeMillis() - SignalStore.misc().lastForcedPreKeyRefresh} ms ago. Ignoring.") + } + + syncPreKeys(ServiceIdType.ACI, SignalStore.account().aci, ApplicationDependencies.getProtocolStore().aci(), SignalStore.account().aciPreKeys, forceRotation) + syncPreKeys(ServiceIdType.PNI, SignalStore.account().pni, ApplicationDependencies.getProtocolStore().pni(), SignalStore.account().pniPreKeys, forceRotation) SignalStore.misc().lastFullPrekeyRefreshTime = System.currentTimeMillis() + + if (forceRotation) { + SignalStore.misc().lastForcedPreKeyRefresh = System.currentTimeMillis() + } } - private fun syncPreKeys(serviceIdType: ServiceIdType, serviceId: ServiceId?, protocolStore: SignalServiceAccountDataStore, metadataStore: PreKeyMetadataStore) { + private fun syncPreKeys(serviceIdType: ServiceIdType, serviceId: ServiceId?, protocolStore: SignalServiceAccountDataStore, metadataStore: PreKeyMetadataStore, forceRotation: Boolean) { if (serviceId == null) { warn(TAG, serviceIdType, "AccountId not set!") return @@ -121,20 +152,20 @@ class PreKeysSyncJob private constructor(parameters: Parameters) : BaseJob(param val accountManager = ApplicationDependencies.getSignalServiceAccountManager() val availablePreKeyCounts: OneTimePreKeyCounts = accountManager.getPreKeyCounts(serviceIdType) - val signedPreKeyToUpload: SignedPreKeyRecord? = signedPreKeyUploadIfNeeded(serviceIdType, protocolStore, metadataStore) + val signedPreKeyToUpload: SignedPreKeyRecord? = signedPreKeyUploadIfNeeded(serviceIdType, protocolStore, metadataStore, forceRotation) - val oneTimeEcPreKeysToUpload: List? = if (availablePreKeyCounts.ecCount < ONE_TIME_PREKEY_MINIMUM) { - log(serviceIdType, "There are ${availablePreKeyCounts.ecCount} one-time EC prekeys available, which is less than our threshold. Need more.") + val oneTimeEcPreKeysToUpload: List? = if (forceRotation || availablePreKeyCounts.ecCount < ONE_TIME_PREKEY_MINIMUM) { + log(serviceIdType, "There are ${availablePreKeyCounts.ecCount} one-time EC prekeys available, which is less than our threshold. Need more. (Forced: $forceRotation)") PreKeyUtil.generateAndStoreOneTimeEcPreKeys(protocolStore, metadataStore) } else { log(serviceIdType, "There are ${availablePreKeyCounts.ecCount} one-time EC prekeys available, which is enough.") null } - val lastResortKyberPreKeyToUpload: KyberPreKeyRecord? = lastResortKyberPreKeyUploadIfNeeded(serviceIdType, protocolStore, metadataStore) + val lastResortKyberPreKeyToUpload: KyberPreKeyRecord? = lastResortKyberPreKeyUploadIfNeeded(serviceIdType, protocolStore, metadataStore, forceRotation) - val oneTimeKyberPreKeysToUpload: List? = if (availablePreKeyCounts.kyberCount < ONE_TIME_PREKEY_MINIMUM) { - log(serviceIdType, "There are ${availablePreKeyCounts.kyberCount} one-time kyber prekeys available, which is less than our threshold. Need more.") + val oneTimeKyberPreKeysToUpload: List? = if (forceRotation || availablePreKeyCounts.kyberCount < ONE_TIME_PREKEY_MINIMUM) { + log(serviceIdType, "There are ${availablePreKeyCounts.kyberCount} one-time kyber prekeys available, which is less than our threshold. Need more. (Forced: $forceRotation)") PreKeyUtil.generateAndStoreOneTimeKyberPreKeys(protocolStore, metadataStore) } else { log(serviceIdType, "There are ${availablePreKeyCounts.kyberCount} one-time kyber prekeys available, which is enough.") @@ -183,28 +214,28 @@ class PreKeysSyncJob private constructor(parameters: Parameters) : BaseJob(param PreKeyUtil.cleanOneTimePreKeys(protocolStore) } - private fun signedPreKeyUploadIfNeeded(serviceIdType: ServiceIdType, protocolStore: SignalProtocolStore, metadataStore: PreKeyMetadataStore): SignedPreKeyRecord? { + private fun signedPreKeyUploadIfNeeded(serviceIdType: ServiceIdType, protocolStore: SignalProtocolStore, metadataStore: PreKeyMetadataStore, forceRotation: Boolean): SignedPreKeyRecord? { val signedPreKeyRegistered = metadataStore.isSignedPreKeyRegistered && metadataStore.activeSignedPreKeyId >= 0 val timeSinceLastSignedPreKeyRotation = System.currentTimeMillis() - metadataStore.lastSignedPreKeyRotationTime - return if (!signedPreKeyRegistered || timeSinceLastSignedPreKeyRotation >= REFRESH_INTERVAL || timeSinceLastSignedPreKeyRotation < 0) { - log(serviceIdType, "Rotating signed prekey. SignedPreKeyRegistered: $signedPreKeyRegistered, TimeSinceLastRotation: $timeSinceLastSignedPreKeyRotation ms (${timeSinceLastSignedPreKeyRotation.milliseconds.toDouble(DurationUnit.DAYS)} days)") + return if (forceRotation || !signedPreKeyRegistered || timeSinceLastSignedPreKeyRotation >= REFRESH_INTERVAL || timeSinceLastSignedPreKeyRotation < 0) { + log(serviceIdType, "Rotating signed prekey. ForceRotation: $forceRotation, SignedPreKeyRegistered: $signedPreKeyRegistered, TimeSinceLastRotation: $timeSinceLastSignedPreKeyRotation ms (${timeSinceLastSignedPreKeyRotation.milliseconds.toDouble(DurationUnit.DAYS).roundedString(2)} days)") PreKeyUtil.generateAndStoreSignedPreKey(protocolStore, metadataStore) } else { - log(serviceIdType, "No need to rotate signed prekey. TimeSinceLastRotation: $timeSinceLastSignedPreKeyRotation ms (${timeSinceLastSignedPreKeyRotation.milliseconds.toDouble(DurationUnit.DAYS)} days)") + log(serviceIdType, "No need to rotate signed prekey. TimeSinceLastRotation: $timeSinceLastSignedPreKeyRotation ms (${timeSinceLastSignedPreKeyRotation.milliseconds.toDouble(DurationUnit.DAYS).roundedString(2)} days)") null } } - private fun lastResortKyberPreKeyUploadIfNeeded(serviceIdType: ServiceIdType, protocolStore: SignalServiceAccountDataStore, metadataStore: PreKeyMetadataStore): KyberPreKeyRecord? { + private fun lastResortKyberPreKeyUploadIfNeeded(serviceIdType: ServiceIdType, protocolStore: SignalServiceAccountDataStore, metadataStore: PreKeyMetadataStore, forceRotation: Boolean): KyberPreKeyRecord? { val lastResortRegistered = metadataStore.lastResortKyberPreKeyId >= 0 val timeSinceLastSignedPreKeyRotation = System.currentTimeMillis() - metadataStore.lastResortKyberPreKeyRotationTime - return if (!lastResortRegistered || timeSinceLastSignedPreKeyRotation >= REFRESH_INTERVAL || timeSinceLastSignedPreKeyRotation < 0) { - log(serviceIdType, "Rotating last-resort kyber prekey. TimeSinceLastRotation: $timeSinceLastSignedPreKeyRotation ms (${timeSinceLastSignedPreKeyRotation.milliseconds.toDouble(DurationUnit.DAYS)} days)") + return if (forceRotation || !lastResortRegistered || timeSinceLastSignedPreKeyRotation >= REFRESH_INTERVAL || timeSinceLastSignedPreKeyRotation < 0) { + log(serviceIdType, "Rotating last-resort kyber prekey. ForceRotation: $forceRotation, TimeSinceLastRotation: $timeSinceLastSignedPreKeyRotation ms (${timeSinceLastSignedPreKeyRotation.milliseconds.toDouble(DurationUnit.DAYS).roundedString(2)} days)") PreKeyUtil.generateAndStoreLastResortKyberPreKey(protocolStore, metadataStore) } else { - log(serviceIdType, "No need to rotate last-resort kyber prekey. TimeSinceLastRotation: $timeSinceLastSignedPreKeyRotation ms (${timeSinceLastSignedPreKeyRotation.milliseconds.toDouble(DurationUnit.DAYS)} days)") + log(serviceIdType, "No need to rotate last-resort kyber prekey. TimeSinceLastRotation: $timeSinceLastSignedPreKeyRotation ms (${timeSinceLastSignedPreKeyRotation.milliseconds.toDouble(DurationUnit.DAYS).roundedString(2)} days)") null } } @@ -225,7 +256,10 @@ class PreKeysSyncJob private constructor(parameters: Parameters) : BaseJob(param class Factory : Job.Factory { override fun create(parameters: Parameters, serializedData: ByteArray?): PreKeysSyncJob { - return PreKeysSyncJob(parameters) + return serializedData?.let { + val data = PreKeysSyncJobData.ADAPTER.decode(serializedData) + PreKeysSyncJob(parameters, data.forceRefreshRequested) + } ?: PreKeysSyncJob(parameters, forceRotationRequested = false) } } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/keyvalue/MiscellaneousValues.java b/app/src/main/java/org/thoughtcrime/securesms/keyvalue/MiscellaneousValues.java index ec8df5a8a2..d191b4194e 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/keyvalue/MiscellaneousValues.java +++ b/app/src/main/java/org/thoughtcrime/securesms/keyvalue/MiscellaneousValues.java @@ -40,6 +40,7 @@ public final class MiscellaneousValues extends SignalStoreValues { private static final String SERVER_TIME_OFFSET = "misc.server_time_offset"; private static final String LAST_SERVER_TIME_OFFSET_UPDATE = "misc.last_server_time_offset_update"; private static final String NEEDS_USERNAME_RESTORE = "misc.needs_username_restore"; + private static final String LAST_FORCED_PREKEY_REFRESH = "misc.last_forced_prekey_refresh"; MiscellaneousValues(@NonNull KeyValueStore store) { super(store); @@ -344,4 +345,18 @@ public final class MiscellaneousValues extends SignalStoreValues { public void setNeedsUsernameRestore(boolean value) { putBoolean(NEEDS_USERNAME_RESTORE, value); } + + /** + * Set the last time we successfully completed a forced prekey refresh. + */ + public void setLastForcedPreKeyRefresh(long time) { + putLong(LAST_FORCED_PREKEY_REFRESH, time); + } + + /** + * Get the last time we successfully completed a forced prekey refresh. + */ + public long getLastForcedPreKeyRefresh() { + return getLong(LAST_FORCED_PREKEY_REFRESH, 0); + } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt index 57975b0c1d..b351ae7ac4 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt @@ -30,6 +30,7 @@ import org.thoughtcrime.securesms.notifications.NotificationChannels import org.thoughtcrime.securesms.recipients.RecipientId import org.thoughtcrime.securesms.util.AppForegroundObserver import org.thoughtcrime.securesms.util.SignalLocalMetrics +import org.thoughtcrime.securesms.util.asChain import org.whispersystems.signalservice.api.push.ServiceId import org.whispersystems.signalservice.api.util.UuidUtil import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState @@ -295,7 +296,7 @@ class IncomingMessageObserver(private val context: Application) { is MessageDecryptor.Result.Success -> { val job = PushProcessMessageJob.processOrDefer(messageContentProcessor, result, localReceiveMetric) if (job != null) { - return result.followUpOperations + FollowUpOperation { job } + return result.followUpOperations + FollowUpOperation { job.asChain() } } } is MessageDecryptor.Result.Error -> { @@ -304,7 +305,7 @@ class IncomingMessageObserver(private val context: Application) { result.toMessageState(), result.errorMetadata.toExceptionMetadata(), result.envelope.timestamp!! - ) + ).asChain() } } is MessageDecryptor.Result.Ignore -> { @@ -404,7 +405,7 @@ class IncomingMessageObserver(private val context: Application) { if (followUpOperations != null) { Log.d(TAG, "Running ${followUpOperations.size} follow-up operations...") val jobs = followUpOperations.mapNotNull { it.run() } - ApplicationDependencies.getJobManager().addAll(jobs) + ApplicationDependencies.getJobManager().addAllChains(jobs) } signalWebSocket.sendAck(response) diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/MessageDecryptor.kt b/app/src/main/java/org/thoughtcrime/securesms/messages/MessageDecryptor.kt index fe0ca69e20..42f564631e 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/messages/MessageDecryptor.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/MessageDecryptor.kt @@ -37,7 +37,7 @@ import org.thoughtcrime.securesms.database.SignalDatabase import org.thoughtcrime.securesms.dependencies.ApplicationDependencies import org.thoughtcrime.securesms.groups.BadGroupIdException import org.thoughtcrime.securesms.groups.GroupId -import org.thoughtcrime.securesms.jobmanager.Job +import org.thoughtcrime.securesms.jobmanager.JobManager import org.thoughtcrime.securesms.jobs.AutomaticSessionResetJob import org.thoughtcrime.securesms.jobs.PreKeysSyncJob import org.thoughtcrime.securesms.jobs.SendRetryReceiptJob @@ -50,6 +50,8 @@ import org.thoughtcrime.securesms.notifications.NotificationIds import org.thoughtcrime.securesms.recipients.Recipient import org.thoughtcrime.securesms.recipients.RecipientId import org.thoughtcrime.securesms.util.FeatureFlags +import org.thoughtcrime.securesms.util.LRUCache +import org.thoughtcrime.securesms.util.asChain import org.whispersystems.signalservice.api.InvalidMessageStructureException import org.whispersystems.signalservice.api.crypto.ContentHint import org.whispersystems.signalservice.api.crypto.EnvelopeMetadata @@ -77,6 +79,8 @@ object MessageDecryptor { private val TAG = Log.tag(MessageDecryptor::class.java) + private val decryptionErrorCounts: MutableMap = LRUCache(100) + /** * Decrypts an envelope and provides a [Result]. This method has side effects, but all of them are limited to [SignalDatabase]. * That means that this operation should be atomic when performed within a transaction. @@ -125,8 +129,9 @@ object MessageDecryptor { val followUpOperations: MutableList = mutableListOf() if (envelope.type == Envelope.Type.PREKEY_BUNDLE) { + Log.i(TAG, "${logPrefix(envelope)} Prekey message. Scheduling a prekey sync job.") followUpOperations += FollowUpOperation { - PreKeysSyncJob.create() + PreKeysSyncJob.create().asChain() } } @@ -219,7 +224,7 @@ object MessageDecryptor { followUpOperations += FollowUpOperation { val sender: Recipient = Recipient.external(context, e.sender) - AutomaticSessionResetJob(sender.id, e.senderDevice, envelope.timestamp!!) + AutomaticSessionResetJob(sender.id, e.senderDevice, envelope.timestamp!!).asChain() } Result.Ignore(envelope, serverDeliveredTimestamp, followUpOperations.toUnmodifiableList()) @@ -277,36 +282,70 @@ object MessageDecryptor { val senderDevice: Int = protocolException.senderDevice val receivedTimestamp: Long = System.currentTimeMillis() val sender: Recipient = Recipient.external(context, protocolException.sender) + val senderServiceId: ServiceId? = ServiceId.parseOrNull(protocolException.sender) if (sender.isSelf) { - Log.w(TAG, "${logPrefix(envelope)} Decryption error for a sync message! Enqueuing a session reset job.") + Log.w(TAG, "${logPrefix(envelope)} Decryption error for a sync message! Enqueuing a session reset job.", true) followUpOperations += FollowUpOperation { - AutomaticSessionResetJob(sender.id, senderDevice, envelope.timestamp!!) + AutomaticSessionResetJob(sender.id, senderDevice, envelope.timestamp!!).asChain() } return Result.Ignore(envelope, serverDeliveredTimestamp, followUpOperations) } + val errorCount: DecryptionErrorCount = decryptionErrorCounts.getOrPut(sender.id) { DecryptionErrorCount(count = 0, lastReceivedTime = 0) } + val timeSinceLastError = receivedTimestamp - errorCount.lastReceivedTime + if (timeSinceLastError > FeatureFlags.retryReceiptMaxCountResetAge() && errorCount.count > 0) { + Log.i(TAG, "${logPrefix(envelope, senderServiceId)} Resetting decryption error count for ${sender.id} because it has been $timeSinceLastError ms since the last error.", true) + errorCount.count = 0 + } + + errorCount.count++ + errorCount.lastReceivedTime = receivedTimestamp + + if (errorCount.count > FeatureFlags.retryReceiptMaxCount()) { + Log.w(TAG, "${logPrefix(envelope, senderServiceId)} This is error number ${errorCount.count} from ${sender.id}, which is greater than the maximum of ${FeatureFlags.retryReceiptMaxCount()}. Ignoring.", true) + + if (contentHint == ContentHint.IMPLICIT) { + Log.w(TAG, "${logPrefix(envelope, senderServiceId)} The content hint is $contentHint, so no error message is needed.", true) + Result.Ignore(envelope, serverDeliveredTimestamp, followUpOperations) + } else { + Log.w(TAG, "${logPrefix(envelope, senderServiceId)} The content hint is $contentHint, so we need to insert an error right away.", true) + return Result.DecryptionError(envelope, serverDeliveredTimestamp, protocolException.toErrorMetadata(), followUpOperations.toUnmodifiableList()) + } + } else { + Log.w(TAG, "${logPrefix(envelope, senderServiceId)} This is error number ${errorCount.count} from ${sender.id}.${if (errorCount.count > 1) " It has been $timeSinceLastError ms since the last error." else "" }", true) + } + followUpOperations += FollowUpOperation { - buildSendRetryReceiptJob(envelope, protocolException, sender) + val retryJob = buildSendRetryReceiptJob(envelope, protocolException, sender) + + // Note: if the message is sealed sender, it's envelope type will be UNIDENTIFIED_SENDER. The only way we can currently check if the error is + // prekey-related in that situation is using a string match. + if (envelope.type == Envelope.Type.PREKEY_BUNDLE || protocolException.message?.lowercase()?.contains("prekey") == true) { + Log.w(TAG, "${logPrefix(envelope, senderServiceId)} Got a decryption error on a prekey message. Forcing a prekey rotation before requesting the retry.", true) + PreKeysSyncJob.create(forceRotationRequested = true).asChain().then(retryJob) + } else { + retryJob.asChain() + } } return when (contentHint) { ContentHint.DEFAULT -> { - Log.w(TAG, "${logPrefix(envelope)} The content hint is $contentHint, so we need to insert an error right away.", true) + Log.w(TAG, "${logPrefix(envelope, senderServiceId)} The content hint is $contentHint, so we need to insert an error right away.", true) Result.DecryptionError(envelope, serverDeliveredTimestamp, protocolException.toErrorMetadata(), followUpOperations.toUnmodifiableList()) } ContentHint.RESENDABLE -> { - Log.w(TAG, "${logPrefix(envelope)} The content hint is $contentHint, so we can try to resend the message.", true) + Log.w(TAG, "${logPrefix(envelope, senderServiceId)} The content hint is $contentHint, so we can try to resend the message.", true) followUpOperations += FollowUpOperation { val groupId: GroupId? = protocolException.parseGroupId(envelope) val threadId: Long? = if (groupId != null) { if (SignalDatabase.groups.getGroup(groupId).isAbsent()) { - Log.w(TAG, "${logPrefix(envelope)} No group found for $groupId! Not inserting a retry receipt.") + Log.w(TAG, "${logPrefix(envelope, senderServiceId)} No group found for $groupId! Not inserting a retry receipt.") return@FollowUpOperation null } @@ -317,7 +356,7 @@ object MessageDecryptor { } if (threadId == null) { - Log.w(TAG, "${logPrefix(envelope)} Thread does not already exist for sender ${sender.id}! We will not create one just to show a retry receipt.") + Log.w(TAG, "${logPrefix(envelope, senderServiceId)} Thread does not already exist for sender ${sender.id}! We will not create one just to show a retry receipt.") return@FollowUpOperation null } @@ -330,7 +369,7 @@ object MessageDecryptor { } ContentHint.IMPLICIT -> { - Log.w(TAG, "${logPrefix(envelope)} The content hint is $contentHint, so no error message is needed.", true) + Log.w(TAG, "${logPrefix(envelope, senderServiceId)} The content hint is $contentHint, so no error message is needed.", true) Result.Ignore(envelope, serverDeliveredTimestamp, followUpOperations) } } @@ -399,20 +438,24 @@ object MessageDecryptor { } private fun logPrefix(envelope: Envelope): String { - return logPrefix(envelope.timestamp!!, envelope.sourceServiceId ?: "", envelope.sourceDevice) + return logPrefix(envelope.timestamp!!, ServiceId.parseOrNull(envelope.sourceServiceId)?.logString() ?: "", envelope.sourceDevice) } - private fun logPrefix(envelope: Envelope, sender: ServiceId): String { - return logPrefix(envelope.timestamp!!, sender.toString(), envelope.sourceDevice) + private fun logPrefix(envelope: Envelope, sender: ServiceId?): String { + return logPrefix(envelope.timestamp!!, sender?.logString() ?: "?", envelope.sourceDevice) + } + + private fun logPrefix(envelope: Envelope, sender: String): String { + return logPrefix(envelope.timestamp!!, ServiceId.parseOrNull(sender)?.logString() ?: "?", envelope.sourceDevice) } private fun logPrefix(envelope: Envelope, cipherResult: SignalServiceCipherResult): String { - return logPrefix(envelope.timestamp!!, cipherResult.metadata.sourceServiceId.toString(), cipherResult.metadata.sourceDeviceId) + return logPrefix(envelope.timestamp!!, cipherResult.metadata.sourceServiceId.logString(), cipherResult.metadata.sourceDeviceId) } private fun logPrefix(envelope: Envelope, exception: ProtocolException): String { return if (exception.sender != null) { - logPrefix(envelope.timestamp!!, exception.sender, exception.senderDevice) + logPrefix(envelope.timestamp!!, ServiceId.parseOrNull(exception.sender)?.logString() ?: "?", exception.senderDevice) } else { logPrefix(envelope.timestamp!!, envelope.sourceServiceId, envelope.sourceDevice) } @@ -546,7 +589,12 @@ object MessageDecryptor { val groupId: GroupId? ) + data class DecryptionErrorCount( + var count: Int, + var lastReceivedTime: Long + ) + fun interface FollowUpOperation { - fun run(): Job? + fun run(): JobManager.Chain? } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/util/FeatureFlags.java b/app/src/main/java/org/thoughtcrime/securesms/util/FeatureFlags.java index a46bf8da2b..c46f18d7e5 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/util/FeatureFlags.java +++ b/app/src/main/java/org/thoughtcrime/securesms/util/FeatureFlags.java @@ -121,6 +121,9 @@ public final class FeatureFlags { private static final String GIF_SEARCH = "global.gifSearch"; private static final String AUDIO_REMUXING = "android.media.audioRemux.1"; private static final String VIDEO_RECORD_1X_ZOOM = "android.media.videoCaptureDefaultZoom"; + private static final String RETRY_RECEIPT_MAX_COUNT = "android.retryReceipt.maxCount"; + private static final String RETRY_RECEIPT_MAX_COUNT_RESET_AGE = "android.retryReceipt.maxCountResetAge"; + private static final String PREKEY_FORCE_REFRESH_INTERVAL = "android.prekeyForceRefreshInterval"; /** * We will only store remote values for flags in this set. If you want a flag to be controllable @@ -194,7 +197,10 @@ public final class FeatureFlags { USE_ACTIVE_CALL_MANAGER, GIF_SEARCH, AUDIO_REMUXING, - VIDEO_RECORD_1X_ZOOM + VIDEO_RECORD_1X_ZOOM, + RETRY_RECEIPT_MAX_COUNT, + RETRY_RECEIPT_MAX_COUNT_RESET_AGE, + PREKEY_FORCE_REFRESH_INTERVAL ); @VisibleForTesting @@ -265,7 +271,10 @@ public final class FeatureFlags { NOTIFICATION_THUMBNAIL_BLOCKLIST, CALLING_RAISE_HAND, PHONE_NUMBER_PRIVACY, - VIDEO_RECORD_1X_ZOOM + VIDEO_RECORD_1X_ZOOM, + RETRY_RECEIPT_MAX_COUNT, + RETRY_RECEIPT_MAX_COUNT_RESET_AGE, + PREKEY_FORCE_REFRESH_INTERVAL ); /** @@ -454,6 +463,20 @@ public final class FeatureFlags { return getLong(RETRY_RESPOND_MAX_AGE, TimeUnit.DAYS.toMillis(14)); } + /** + * The max number of retry receipts sends we allow (within @link{#retryReceiptMaxCountResetAge()}) before we consider the volume too large and stop responding. + */ + public static long retryReceiptMaxCount() { + return getLong(RETRY_RECEIPT_MAX_COUNT, 10); + } + + /** + * If the last retry receipt send was older than this, then we reset the retry receipt sent count. (For use with @link{#retryReceiptMaxCount()}) + */ + public static long retryReceiptMaxCountResetAge() { + return getLong(RETRY_RECEIPT_MAX_COUNT_RESET_AGE, TimeUnit.HOURS.toMillis(3)); + } + /** How long a sender key can live before it needs to be rotated. */ public static long senderKeyMaxAge() { return Math.min(getLong(SENDER_KEY_MAX_AGE, TimeUnit.DAYS.toMillis(14)), TimeUnit.DAYS.toMillis(90)); @@ -698,6 +721,11 @@ public final class FeatureFlags { return getBoolean(VIDEO_RECORD_1X_ZOOM, false); } + /** How often we allow a forced prekey refresh. */ + public static long preKeyForceRefreshInterval() { + return getLong(PREKEY_FORCE_REFRESH_INTERVAL, TimeUnit.HOURS.toMillis(1)); + } + /** Only for rendering debug info. */ public static synchronized @NonNull Map getMemoryValues() { return new TreeMap<>(REMOTE_VALUES); diff --git a/app/src/main/java/org/thoughtcrime/securesms/util/JobExtensions.kt b/app/src/main/java/org/thoughtcrime/securesms/util/JobExtensions.kt new file mode 100644 index 0000000000..dde6c12daf --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/util/JobExtensions.kt @@ -0,0 +1,15 @@ +/* + * Copyright 2024 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.thoughtcrime.securesms.util + +import org.thoughtcrime.securesms.dependencies.ApplicationDependencies +import org.thoughtcrime.securesms.jobmanager.Job +import org.thoughtcrime.securesms.jobmanager.JobManager + +/** Starts a new chain with this job. */ +fun Job.asChain(): JobManager.Chain { + return ApplicationDependencies.getJobManager().startChain(this) +} diff --git a/app/src/main/protowire/JobData.proto b/app/src/main/protowire/JobData.proto index c048c958d6..89823e7a9c 100644 --- a/app/src/main/protowire/JobData.proto +++ b/app/src/main/protowire/JobData.proto @@ -36,4 +36,8 @@ message AttachmentUploadJobData { uint64 attachmentId = 1; reserved /*attachmentUniqueId*/ 2; optional ResumableUpload uploadSpec = 3; +} + +message PreKeysSyncJobData { + bool forceRefreshRequested = 1; } \ No newline at end of file