diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt b/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt index 95d40ce4af..9cac88faad 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt @@ -1589,7 +1589,7 @@ class AttachmentTable( * that the content of the attachment will never change. */ @Throws(MmsException::class) - fun finalizeAttachmentAfterDownload(mmsId: Long, attachmentId: AttachmentId, inputStream: InputStream, offloadRestoredAt: Duration? = null, archiveRestore: Boolean = false) { + fun finalizeAttachmentAfterDownload(mmsId: Long, attachmentId: AttachmentId, inputStream: InputStream, offloadRestoredAt: Duration? = null, archiveRestore: Boolean = false, notify: Boolean = true) { Log.i(TAG, "[finalizeAttachmentAfterDownload] Finalizing downloaded data for $attachmentId. (MessageId: $mmsId, $attachmentId)") val existingPlaceholder: DatabaseAttachment = getAttachment(attachmentId) ?: throw MmsException("No attachment found for id: $attachmentId") @@ -1679,9 +1679,11 @@ class AttachmentTable( threads.updateSnippetUriSilently(threadId, snippetMessageId = mmsId, attachment = PartAuthority.getAttachmentDataUri(attachmentId)) } - notifyConversationListeners(threadId) - notifyConversationListListeners() - AppDependencies.databaseObserver.notifyAttachmentUpdatedObservers() + if (notify) { + notifyConversationListeners(threadId) + notifyConversationListListeners() + AppDependencies.databaseObserver.notifyAttachmentUpdatedObservers() + } if (foundDuplicate) { if (!fileWriteResult.file.delete()) { diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/RestoreAttachmentJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/RestoreAttachmentJob.kt index f4fd69017c..1ceaafc396 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/RestoreAttachmentJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/RestoreAttachmentJob.kt @@ -15,6 +15,8 @@ import androidx.core.content.ContextCompat import org.greenrobot.eventbus.EventBus import org.signal.core.util.Base64.decodeBase64OrThrow import org.signal.core.util.PendingIntentFlags +import org.signal.core.util.ThreadUtil +import org.signal.core.util.concurrent.SignalExecutors import org.signal.core.util.isNotNullOrBlank import org.signal.core.util.logging.Log import org.signal.libsignal.protocol.InvalidMacException @@ -50,6 +52,7 @@ import org.thoughtcrime.securesms.stickers.StickerLocator import org.thoughtcrime.securesms.transport.RetryLaterException import org.thoughtcrime.securesms.util.RemoteConfig import org.thoughtcrime.securesms.util.SignalLocalMetrics +import org.thoughtcrime.securesms.util.ThrottledDebouncer import org.whispersystems.signalservice.api.crypto.AttachmentCipherInputStream.IntegrityCheck import org.whispersystems.signalservice.api.messages.AttachmentTransferProgress import org.whispersystems.signalservice.api.messages.SignalServiceAttachment @@ -59,6 +62,9 @@ import org.whispersystems.signalservice.api.push.exceptions.PushNetworkException import org.whispersystems.signalservice.api.push.exceptions.RangeException import java.io.File import java.io.IOException +import java.util.concurrent.ExecutionException +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import kotlin.jvm.optionals.getOrNull import kotlin.math.abs @@ -67,6 +73,7 @@ import kotlin.math.pow import kotlin.time.Duration.Companion.days import kotlin.time.Duration.Companion.hours import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.Duration.Companion.seconds /** * Download attachment from locations as specified in their record. @@ -121,6 +128,23 @@ class RestoreAttachmentJob private constructor( const val KEY = "RestoreAttachmentJob" private val TAG = Log.tag(RestoreAttachmentJob::class.java) + /** + * During media restore, we tend to hammer the database from a lot of different threads at once. This can block writes for more urgent things, like message + * sends. To reduce the impact, we put all of our database writes on a single-thread executor. + */ + private val DB_EXECUTOR = Executors.newSingleThreadExecutor(SignalExecutors.NumberedThreadFactory("restore-db", ThreadUtil.PRIORITY_IMPORTANT_BACKGROUND_THREAD)) + + /** + * By default, downloading an attachment wants to notify a bunch of database observation listeners. This slams the observer so hard that other people using + * it will experience massive delays in notifications. To avoid this, we turn off notifications for downloads, and then use this notifier to push some + * out every so often. + */ + val DATABASE_OBSERVER_NOTIFIER = ThrottledDebouncer(5.seconds.inWholeMilliseconds) + val NOTIFY_DATABASE_OBSERVERS = { + AppDependencies.databaseObserver.notifyConversationListListeners() + AppDependencies.databaseObserver.notifyAttachmentUpdatedObservers() + } + /** * Create a restore job for the initial large batch of media on a fresh restore. * Will enqueue with some amount of parallelization with low job priority. @@ -204,7 +228,9 @@ class RestoreAttachmentJob private constructor( } override fun onAdded() { - SignalDatabase.attachments.setRestoreTransferState(attachmentId, AttachmentTable.TRANSFER_RESTORE_IN_PROGRESS) + DB_EXECUTOR.runBlocking { + SignalDatabase.attachments.setRestoreTransferState(attachmentId, AttachmentTable.TRANSFER_RESTORE_IN_PROGRESS) + } } @Throws(Exception::class) @@ -259,7 +285,10 @@ class RestoreAttachmentJob private constructor( dataStream?.use { input -> Log.i(TAG, "[$attachmentId] Attachment is sticker, restoring from local storage") - SignalDatabase.attachments.finalizeAttachmentAfterDownload(messageId, attachmentId, input, if (manual) System.currentTimeMillis().milliseconds else null) + DB_EXECUTOR.runBlocking { + SignalDatabase.attachments.finalizeAttachmentAfterDownload(messageId, attachmentId, input, if (manual) System.currentTimeMillis().milliseconds else null, notify = false) + DATABASE_OBSERVER_NOTIFIER.publish(NOTIFY_DATABASE_OBSERVERS) + } return } } @@ -285,7 +314,9 @@ class RestoreAttachmentJob private constructor( override fun onFailure() { if (isCanceled) { - SignalDatabase.attachments.setTransferState(messageId, attachmentId, AttachmentTable.TRANSFER_RESTORE_OFFLOADED) + DB_EXECUTOR.runBlocking { + SignalDatabase.attachments.setTransferState(messageId, attachmentId, AttachmentTable.TRANSFER_RESTORE_OFFLOADED) + } } else { Log.w(TAG, format(this, "onFailure() messageId: $messageId attachmentId: $attachmentId")) @@ -326,7 +357,9 @@ class RestoreAttachmentJob private constructor( forceTransitTier: Boolean = false ) { val maxReceiveSize: Long = RemoteConfig.maxAttachmentReceiveSizeBytes - val attachmentFile: File = SignalDatabase.attachments.getOrCreateTransferFile(attachmentId) + val attachmentFile: File = DB_EXECUTOR.runBlocking { + SignalDatabase.attachments.getOrCreateTransferFile(attachmentId) + } var useArchiveCdn = false if (attachment.remoteDigest == null && attachment.dataHash == null) { @@ -397,12 +430,16 @@ class RestoreAttachmentJob private constructor( attachmentId = attachmentId, inputStream = input, offloadRestoredAt = if (manual) System.currentTimeMillis().milliseconds else null, - archiveRestore = true + archiveRestore = true, + notify = false ) + DATABASE_OBSERVER_NOTIFIER.publish(NOTIFY_DATABASE_OBSERVERS) } if (useArchiveCdn && attachment.archiveCdn == null) { - SignalDatabase.attachments.setArchiveCdn(attachmentId, pointer.cdnNumber) + DB_EXECUTOR.runBlocking { + SignalDatabase.attachments.setArchiveCdn(attachmentId, pointer.cdnNumber) + } } ArchiveRestoreProgress.onWriteToDiskEnd(attachmentId) @@ -465,7 +502,9 @@ class RestoreAttachmentJob private constructor( } } catch (e: org.signal.libsignal.protocol.incrementalmac.InvalidMacException) { Log.w(TAG, "[$attachmentId] Detected an invalid incremental mac. Clearing and marking as a temporary failure, requiring the user to manually try again.") - SignalDatabase.attachments.clearIncrementalMacsForAttachmentAndAnyDuplicates(attachmentId, attachment.remoteKey, attachment.dataHash) + DB_EXECUTOR.runBlocking { + SignalDatabase.attachments.clearIncrementalMacsForAttachmentAndAnyDuplicates(attachmentId, attachment.remoteKey, attachment.dataHash) + } markFailed(attachmentId) } @@ -473,11 +512,15 @@ class RestoreAttachmentJob private constructor( } private fun markFailed(attachmentId: AttachmentId) { - SignalDatabase.attachments.setRestoreTransferState(attachmentId, AttachmentTable.TRANSFER_PROGRESS_FAILED) + DB_EXECUTOR.runBlocking { + SignalDatabase.attachments.setRestoreTransferState(attachmentId, AttachmentTable.TRANSFER_PROGRESS_FAILED) + } } private fun markPermanentlyFailed(attachmentId: AttachmentId) { - SignalDatabase.attachments.setRestoreTransferState(attachmentId, AttachmentTable.TRANSFER_PROGRESS_PERMANENT_FAILURE) + DB_EXECUTOR.runBlocking { + SignalDatabase.attachments.setRestoreTransferState(attachmentId, AttachmentTable.TRANSFER_PROGRESS_PERMANENT_FAILURE) + } } private fun maybePostFailedToDownloadFromArchiveNotification() { @@ -520,6 +563,14 @@ class RestoreAttachmentJob private constructor( NotificationManagerCompat.from(context).notify(NotificationIds.INTERNAL_ERROR, notification) } + private fun ExecutorService.runBlocking(block: () -> T): T { + return try { + this.submit(block).get() + } catch (e: ExecutionException) { + throw e.cause ?: e + } + } + class Factory : Job.Factory { override fun create(parameters: Parameters, serializedData: ByteArray?): RestoreAttachmentJob { val data = RestoreAttachmentJobData.ADAPTER.decode(serializedData!!)