Fix the UI being unresponsive during media restore.

This commit is contained in:
Greyson Parrelli
2025-11-03 09:49:47 -05:00
committed by Michelle Tang
parent 07f33d22bf
commit aed9b3afaa
2 changed files with 66 additions and 13 deletions

View File

@@ -1589,7 +1589,7 @@ class AttachmentTable(
* that the content of the attachment will never change.
*/
@Throws(MmsException::class)
fun finalizeAttachmentAfterDownload(mmsId: Long, attachmentId: AttachmentId, inputStream: InputStream, offloadRestoredAt: Duration? = null, archiveRestore: Boolean = false) {
fun finalizeAttachmentAfterDownload(mmsId: Long, attachmentId: AttachmentId, inputStream: InputStream, offloadRestoredAt: Duration? = null, archiveRestore: Boolean = false, notify: Boolean = true) {
Log.i(TAG, "[finalizeAttachmentAfterDownload] Finalizing downloaded data for $attachmentId. (MessageId: $mmsId, $attachmentId)")
val existingPlaceholder: DatabaseAttachment = getAttachment(attachmentId) ?: throw MmsException("No attachment found for id: $attachmentId")
@@ -1679,9 +1679,11 @@ class AttachmentTable(
threads.updateSnippetUriSilently(threadId, snippetMessageId = mmsId, attachment = PartAuthority.getAttachmentDataUri(attachmentId))
}
notifyConversationListeners(threadId)
notifyConversationListListeners()
AppDependencies.databaseObserver.notifyAttachmentUpdatedObservers()
if (notify) {
notifyConversationListeners(threadId)
notifyConversationListListeners()
AppDependencies.databaseObserver.notifyAttachmentUpdatedObservers()
}
if (foundDuplicate) {
if (!fileWriteResult.file.delete()) {

View File

@@ -15,6 +15,8 @@ import androidx.core.content.ContextCompat
import org.greenrobot.eventbus.EventBus
import org.signal.core.util.Base64.decodeBase64OrThrow
import org.signal.core.util.PendingIntentFlags
import org.signal.core.util.ThreadUtil
import org.signal.core.util.concurrent.SignalExecutors
import org.signal.core.util.isNotNullOrBlank
import org.signal.core.util.logging.Log
import org.signal.libsignal.protocol.InvalidMacException
@@ -50,6 +52,7 @@ import org.thoughtcrime.securesms.stickers.StickerLocator
import org.thoughtcrime.securesms.transport.RetryLaterException
import org.thoughtcrime.securesms.util.RemoteConfig
import org.thoughtcrime.securesms.util.SignalLocalMetrics
import org.thoughtcrime.securesms.util.ThrottledDebouncer
import org.whispersystems.signalservice.api.crypto.AttachmentCipherInputStream.IntegrityCheck
import org.whispersystems.signalservice.api.messages.AttachmentTransferProgress
import org.whispersystems.signalservice.api.messages.SignalServiceAttachment
@@ -59,6 +62,9 @@ import org.whispersystems.signalservice.api.push.exceptions.PushNetworkException
import org.whispersystems.signalservice.api.push.exceptions.RangeException
import java.io.File
import java.io.IOException
import java.util.concurrent.ExecutionException
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import kotlin.jvm.optionals.getOrNull
import kotlin.math.abs
@@ -67,6 +73,7 @@ import kotlin.math.pow
import kotlin.time.Duration.Companion.days
import kotlin.time.Duration.Companion.hours
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds
/**
* Download attachment from locations as specified in their record.
@@ -121,6 +128,23 @@ class RestoreAttachmentJob private constructor(
const val KEY = "RestoreAttachmentJob"
private val TAG = Log.tag(RestoreAttachmentJob::class.java)
/**
* During media restore, we tend to hammer the database from a lot of different threads at once. This can block writes for more urgent things, like message
* sends. To reduce the impact, we put all of our database writes on a single-thread executor.
*/
private val DB_EXECUTOR = Executors.newSingleThreadExecutor(SignalExecutors.NumberedThreadFactory("restore-db", ThreadUtil.PRIORITY_IMPORTANT_BACKGROUND_THREAD))
/**
* By default, downloading an attachment wants to notify a bunch of database observation listeners. This slams the observer so hard that other people using
* it will experience massive delays in notifications. To avoid this, we turn off notifications for downloads, and then use this notifier to push some
* out every so often.
*/
val DATABASE_OBSERVER_NOTIFIER = ThrottledDebouncer(5.seconds.inWholeMilliseconds)
val NOTIFY_DATABASE_OBSERVERS = {
AppDependencies.databaseObserver.notifyConversationListListeners()
AppDependencies.databaseObserver.notifyAttachmentUpdatedObservers()
}
/**
* Create a restore job for the initial large batch of media on a fresh restore.
* Will enqueue with some amount of parallelization with low job priority.
@@ -204,7 +228,9 @@ class RestoreAttachmentJob private constructor(
}
override fun onAdded() {
SignalDatabase.attachments.setRestoreTransferState(attachmentId, AttachmentTable.TRANSFER_RESTORE_IN_PROGRESS)
DB_EXECUTOR.runBlocking {
SignalDatabase.attachments.setRestoreTransferState(attachmentId, AttachmentTable.TRANSFER_RESTORE_IN_PROGRESS)
}
}
@Throws(Exception::class)
@@ -259,7 +285,10 @@ class RestoreAttachmentJob private constructor(
dataStream?.use { input ->
Log.i(TAG, "[$attachmentId] Attachment is sticker, restoring from local storage")
SignalDatabase.attachments.finalizeAttachmentAfterDownload(messageId, attachmentId, input, if (manual) System.currentTimeMillis().milliseconds else null)
DB_EXECUTOR.runBlocking {
SignalDatabase.attachments.finalizeAttachmentAfterDownload(messageId, attachmentId, input, if (manual) System.currentTimeMillis().milliseconds else null, notify = false)
DATABASE_OBSERVER_NOTIFIER.publish(NOTIFY_DATABASE_OBSERVERS)
}
return
}
}
@@ -285,7 +314,9 @@ class RestoreAttachmentJob private constructor(
override fun onFailure() {
if (isCanceled) {
SignalDatabase.attachments.setTransferState(messageId, attachmentId, AttachmentTable.TRANSFER_RESTORE_OFFLOADED)
DB_EXECUTOR.runBlocking {
SignalDatabase.attachments.setTransferState(messageId, attachmentId, AttachmentTable.TRANSFER_RESTORE_OFFLOADED)
}
} else {
Log.w(TAG, format(this, "onFailure() messageId: $messageId attachmentId: $attachmentId"))
@@ -326,7 +357,9 @@ class RestoreAttachmentJob private constructor(
forceTransitTier: Boolean = false
) {
val maxReceiveSize: Long = RemoteConfig.maxAttachmentReceiveSizeBytes
val attachmentFile: File = SignalDatabase.attachments.getOrCreateTransferFile(attachmentId)
val attachmentFile: File = DB_EXECUTOR.runBlocking {
SignalDatabase.attachments.getOrCreateTransferFile(attachmentId)
}
var useArchiveCdn = false
if (attachment.remoteDigest == null && attachment.dataHash == null) {
@@ -397,12 +430,16 @@ class RestoreAttachmentJob private constructor(
attachmentId = attachmentId,
inputStream = input,
offloadRestoredAt = if (manual) System.currentTimeMillis().milliseconds else null,
archiveRestore = true
archiveRestore = true,
notify = false
)
DATABASE_OBSERVER_NOTIFIER.publish(NOTIFY_DATABASE_OBSERVERS)
}
if (useArchiveCdn && attachment.archiveCdn == null) {
SignalDatabase.attachments.setArchiveCdn(attachmentId, pointer.cdnNumber)
DB_EXECUTOR.runBlocking {
SignalDatabase.attachments.setArchiveCdn(attachmentId, pointer.cdnNumber)
}
}
ArchiveRestoreProgress.onWriteToDiskEnd(attachmentId)
@@ -465,7 +502,9 @@ class RestoreAttachmentJob private constructor(
}
} catch (e: org.signal.libsignal.protocol.incrementalmac.InvalidMacException) {
Log.w(TAG, "[$attachmentId] Detected an invalid incremental mac. Clearing and marking as a temporary failure, requiring the user to manually try again.")
SignalDatabase.attachments.clearIncrementalMacsForAttachmentAndAnyDuplicates(attachmentId, attachment.remoteKey, attachment.dataHash)
DB_EXECUTOR.runBlocking {
SignalDatabase.attachments.clearIncrementalMacsForAttachmentAndAnyDuplicates(attachmentId, attachment.remoteKey, attachment.dataHash)
}
markFailed(attachmentId)
}
@@ -473,11 +512,15 @@ class RestoreAttachmentJob private constructor(
}
private fun markFailed(attachmentId: AttachmentId) {
SignalDatabase.attachments.setRestoreTransferState(attachmentId, AttachmentTable.TRANSFER_PROGRESS_FAILED)
DB_EXECUTOR.runBlocking {
SignalDatabase.attachments.setRestoreTransferState(attachmentId, AttachmentTable.TRANSFER_PROGRESS_FAILED)
}
}
private fun markPermanentlyFailed(attachmentId: AttachmentId) {
SignalDatabase.attachments.setRestoreTransferState(attachmentId, AttachmentTable.TRANSFER_PROGRESS_PERMANENT_FAILURE)
DB_EXECUTOR.runBlocking {
SignalDatabase.attachments.setRestoreTransferState(attachmentId, AttachmentTable.TRANSFER_PROGRESS_PERMANENT_FAILURE)
}
}
private fun maybePostFailedToDownloadFromArchiveNotification() {
@@ -520,6 +563,14 @@ class RestoreAttachmentJob private constructor(
NotificationManagerCompat.from(context).notify(NotificationIds.INTERNAL_ERROR, notification)
}
private fun <T> ExecutorService.runBlocking(block: () -> T): T {
return try {
this.submit(block).get()
} catch (e: ExecutionException) {
throw e.cause ?: e
}
}
class Factory : Job.Factory<RestoreAttachmentJob?> {
override fun create(parameters: Parameters, serializedData: ByteArray?): RestoreAttachmentJob {
val data = RestoreAttachmentJobData.ADAPTER.decode(serializedData!!)