diff --git a/app/src/main/java/org/thoughtcrime/securesms/backup/v2/ArchiveDatabaseExecutor.kt b/app/src/main/java/org/thoughtcrime/securesms/backup/v2/ArchiveDatabaseExecutor.kt new file mode 100644 index 0000000000..6ee124073e --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/backup/v2/ArchiveDatabaseExecutor.kt @@ -0,0 +1,56 @@ +/* + * Copyright 2025 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.thoughtcrime.securesms.backup.v2 + +import org.signal.core.util.ThreadUtil +import org.signal.core.util.concurrent.SignalExecutors +import org.thoughtcrime.securesms.dependencies.AppDependencies +import org.thoughtcrime.securesms.util.ThrottledDebouncer +import java.util.concurrent.ExecutionException +import java.util.concurrent.Executors +import kotlin.time.Duration.Companion.seconds + +/** + * During media upload/restore, we tend to hammer the database from a lot of different threads at once. This can block writes for more urgent things, like message + * sends. To reduce the impact, we put all of our database writes on a single-thread executor. + */ +object ArchiveDatabaseExecutor { + + val executor = Executors.newSingleThreadExecutor(SignalExecutors.NumberedThreadFactory("archive-db", ThreadUtil.PRIORITY_IMPORTANT_BACKGROUND_THREAD)) + + /** + * By default, downloading/uploading an attachment wants to notify a bunch of database observation listeners. This slams the observer so hard that other + * people using it will experience massive delays in notifications. To avoid this, we turn off notifications for downloads, and then use this notifier to + * push some out every so often. + */ + val databaseObserverNotifier = ThrottledDebouncer(5.seconds.inWholeMilliseconds) + + val notifyAttachmentObservers = { + AppDependencies.databaseObserver.notifyConversationListListeners() + AppDependencies.databaseObserver.notifyAttachmentUpdatedObservers() + } + + val notifyAttachmentAndChatListObservers = { + AppDependencies.databaseObserver.notifyConversationListListeners() + AppDependencies.databaseObserver.notifyAttachmentUpdatedObservers() + } + + fun runBlocking(block: () -> T): T { + return try { + executor.submit(block).get() + } catch (e: ExecutionException) { + throw e.cause ?: e + } + } + + fun throttledNotifyAttachmentObservers() { + databaseObserverNotifier.publish(notifyAttachmentObservers) + } + + fun throttledNotifyAttachmentAndChatListObservers() { + databaseObserverNotifier.publish(notifyAttachmentAndChatListObservers) + } +} diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt b/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt index b8c8744d25..81154c9346 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt @@ -924,7 +924,7 @@ class AttachmentTable( /** * Sets the archive transfer state for the given attachment and all other attachments that share the same data file. */ - fun setArchiveTransferState(id: AttachmentId, state: ArchiveTransferState) { + fun setArchiveTransferState(id: AttachmentId, state: ArchiveTransferState, notify: Boolean = true) { writableDatabase.withinTransaction { val dataFile: String = readableDatabase .select(DATA_FILE) @@ -940,14 +940,16 @@ class AttachmentTable( .run() } - AppDependencies.databaseObserver.notifyAttachmentUpdatedObservers() + if (notify) { + AppDependencies.databaseObserver.notifyAttachmentUpdatedObservers() + } } /** * Sets the archive transfer state for the given attachment id, remote key, and plain text hash tuple and all other attachments that * share the same data file. */ - fun setArchiveTransferState(id: AttachmentId, remoteKey: String, plaintextHash: String, state: ArchiveTransferState): Boolean { + fun setArchiveTransferState(id: AttachmentId, remoteKey: String, plaintextHash: String, state: ArchiveTransferState, notify: Boolean = true): Boolean { writableDatabase.withinTransaction { val dataFile: String = readableDatabase .select(DATA_FILE) @@ -963,7 +965,9 @@ class AttachmentTable( .run() } - AppDependencies.databaseObserver.notifyAttachmentUpdatedObservers() + if (notify) { + AppDependencies.databaseObserver.notifyAttachmentUpdatedObservers() + } return true } diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveThumbnailUploadJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveThumbnailUploadJob.kt index 1ea4713b41..8f0f4ff568 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveThumbnailUploadJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveThumbnailUploadJob.kt @@ -10,6 +10,7 @@ import org.signal.protos.resumableuploads.ResumableUpload import org.thoughtcrime.securesms.attachments.AttachmentId import org.thoughtcrime.securesms.attachments.DatabaseAttachment import org.thoughtcrime.securesms.attachments.PointerAttachment +import org.thoughtcrime.securesms.backup.v2.ArchiveDatabaseExecutor import org.thoughtcrime.securesms.backup.v2.BackupRepository import org.thoughtcrime.securesms.backup.v2.hadIntegrityCheckPerformed import org.thoughtcrime.securesms.backup.v2.requireThumbnailMediaName @@ -99,7 +100,9 @@ class ArchiveThumbnailUploadJob private constructor( val transferStatus = SignalDatabase.attachments.getArchiveThumbnailTransferState(attachmentId) ?: return if (transferStatus == AttachmentTable.ArchiveTransferState.NONE) { - SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.UPLOAD_IN_PROGRESS) + ArchiveDatabaseExecutor.runBlocking { + SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.UPLOAD_IN_PROGRESS) + } } } @@ -117,31 +120,41 @@ class ArchiveThumbnailUploadJob private constructor( if (!MediaUtil.isImageOrVideoType(attachment.contentType)) { Log.w(TAG, "$attachmentId isn't visual media (contentType = ${attachment.contentType}). Skipping.") - SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE) + ArchiveDatabaseExecutor.runBlocking { + SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE) + } return Result.success() } if (attachment.quote) { Log.w(TAG, "$attachmentId is a quote. Skipping.") - SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE) + ArchiveDatabaseExecutor.runBlocking { + SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE) + } return Result.success() } if (attachment.dataHash == null || attachment.remoteKey == null) { Log.w(TAG, "$attachmentId is missing necessary ingredients for a mediaName!") - SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE) + ArchiveDatabaseExecutor.runBlocking { + SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE) + } return Result.success() } if (!attachment.hadIntegrityCheckPerformed()) { Log.w(TAG, "$attachmentId has no integrity check! Cannot proceed.") - SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE) + ArchiveDatabaseExecutor.runBlocking { + SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE) + } return Result.success() } if (SignalDatabase.messages.isStory(attachment.mmsId)) { Log.w(TAG, "$attachmentId is a story. Skipping.") - SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE) + ArchiveDatabaseExecutor.runBlocking { + SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE) + } return Result.success() } @@ -150,12 +163,16 @@ class ArchiveThumbnailUploadJob private constructor( val thumbnailResult = generateThumbnailIfPossible(attachment) if (thumbnailResult == null) { Log.w(TAG, "Unable to generate a thumbnail result for $attachmentId") - SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.PERMANENT_FAILURE) + ArchiveDatabaseExecutor.runBlocking { + SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.PERMANENT_FAILURE) + } return Result.success() } if (isCanceled) { - SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE) + ArchiveDatabaseExecutor.runBlocking { + SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE) + } return Result.failure() } @@ -172,7 +189,9 @@ class ArchiveThumbnailUploadJob private constructor( } if (isCanceled) { - SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE) + ArchiveDatabaseExecutor.runBlocking { + SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE) + } return Result.failure() } @@ -199,7 +218,9 @@ class ArchiveThumbnailUploadJob private constructor( } if (isCanceled) { - SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE) + ArchiveDatabaseExecutor.runBlocking { + SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE) + } return Result.failure() } @@ -220,15 +241,17 @@ class ArchiveThumbnailUploadJob private constructor( return when (val result = BackupRepository.copyThumbnailToArchive(attachmentPointer, attachment)) { is NetworkResult.Success -> { // save attachment thumbnail - SignalDatabase.attachments.finalizeAttachmentThumbnailAfterUpload( - attachmentId = attachmentId, - attachmentPlaintextHash = attachment.dataHash, - attachmentRemoteKey = attachment.remoteKey, - data = thumbnailResult.data - ) + ArchiveDatabaseExecutor.runBlocking { + SignalDatabase.attachments.finalizeAttachmentThumbnailAfterUpload( + attachmentId = attachmentId, + attachmentPlaintextHash = attachment.dataHash, + attachmentRemoteKey = attachment.remoteKey, + data = thumbnailResult.data + ) - Log.d(TAG, "Successfully archived thumbnail for $attachmentId") - SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.FINISHED) + Log.d(TAG, "Successfully archived thumbnail for $attachmentId") + SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.FINISHED) + } Result.success() } @@ -249,10 +272,14 @@ class ArchiveThumbnailUploadJob private constructor( override fun onFailure() { if (this.isCanceled) { Log.w(TAG, "[$attachmentId] Job was canceled, updating archive thumbnail transfer state to ${AttachmentTable.ArchiveTransferState.NONE}.") - SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE) + ArchiveDatabaseExecutor.runBlocking { + SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE) + } } else { Log.w(TAG, "[$attachmentId] Job failed, updating archive thumbnail transfer state to ${AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE} (if not already a permanent failure).") - SignalDatabase.attachments.setArchiveThumbnailTransferStateFailure(attachmentId, AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE) + ArchiveDatabaseExecutor.runBlocking { + SignalDatabase.attachments.setArchiveThumbnailTransferStateFailure(attachmentId, AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE) + } } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/CopyAttachmentToArchiveJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/CopyAttachmentToArchiveJob.kt index 3e4914f547..8b981a3e9c 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/CopyAttachmentToArchiveJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/CopyAttachmentToArchiveJob.kt @@ -10,6 +10,7 @@ import org.thoughtcrime.securesms.attachments.AttachmentId import org.thoughtcrime.securesms.attachments.Cdn import org.thoughtcrime.securesms.attachments.DatabaseAttachment import org.thoughtcrime.securesms.backup.ArchiveUploadProgress +import org.thoughtcrime.securesms.backup.v2.ArchiveDatabaseExecutor import org.thoughtcrime.securesms.backup.v2.BackupRepository import org.thoughtcrime.securesms.database.AttachmentTable import org.thoughtcrime.securesms.database.SignalDatabase @@ -65,14 +66,14 @@ class CopyAttachmentToArchiveJob private constructor(private val attachmentId: A if (transferStatus == AttachmentTable.ArchiveTransferState.NONE || transferStatus == AttachmentTable.ArchiveTransferState.UPLOAD_IN_PROGRESS) { Log.d(TAG, "[$attachmentId] Updating archive transfer state to ${AttachmentTable.ArchiveTransferState.COPY_PENDING}") - SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.COPY_PENDING) + setArchiveTransferStateWithDelayedNotification(attachmentId, AttachmentTable.ArchiveTransferState.COPY_PENDING) } } override fun run(): Result { if (SignalStore.account.isLinkedDevice) { Log.w(TAG, "[$attachmentId] Linked devices don't backup media. Skipping.") - SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE) + setArchiveTransferStateWithDelayedNotification(attachmentId, AttachmentTable.ArchiveTransferState.NONE) return Result.success() } @@ -100,25 +101,25 @@ class CopyAttachmentToArchiveJob private constructor(private val attachmentId: A if (SignalDatabase.messages.isStory(attachment.mmsId)) { Log.i(TAG, "[$attachmentId] Attachment is a story. Resetting transfer state to none and skipping.") - SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE) + setArchiveTransferStateWithDelayedNotification(attachmentId, AttachmentTable.ArchiveTransferState.NONE) return Result.success() } if (SignalDatabase.messages.isViewOnce(attachment.mmsId)) { Log.i(TAG, "[$attachmentId] Attachment is view-once. Resetting transfer state to none and skipping.") - SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE) + setArchiveTransferStateWithDelayedNotification(attachmentId, AttachmentTable.ArchiveTransferState.NONE) return Result.success() } if (SignalDatabase.messages.willMessageExpireBeforeCutoff(attachment.mmsId)) { Log.i(TAG, "[$attachmentId] Message will expire in less than 24 hours. Resetting transfer state to none and skipping.") - SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE) + setArchiveTransferStateWithDelayedNotification(attachmentId, AttachmentTable.ArchiveTransferState.NONE) return Result.success() } if (attachment.contentType == MediaUtil.LONG_TEXT) { Log.i(TAG, "[$attachmentId] Attachment is long text. Resetting transfer state to none and skipping.") - SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE) + setArchiveTransferStateWithDelayedNotification(attachmentId, AttachmentTable.ArchiveTransferState.NONE) return Result.success() } @@ -148,7 +149,7 @@ class CopyAttachmentToArchiveJob private constructor(private val attachmentId: A when (archiveResult.code) { 400 -> { Log.w(TAG, "[$attachmentId] Something is invalid about our request. Possibly the length. Scheduling a re-upload. Body: ${archiveResult.exception.stringBody}") - SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE) + setArchiveTransferStateWithDelayedNotification(attachmentId, AttachmentTable.ArchiveTransferState.NONE) AppDependencies.jobManager.add(UploadAttachmentToArchiveJob(attachmentId, canReuseUpload = false)) Result.success() } @@ -158,7 +159,7 @@ class CopyAttachmentToArchiveJob private constructor(private val attachmentId: A } 410 -> { Log.w(TAG, "[$attachmentId] The attachment no longer exists on the transit tier. Scheduling a re-upload.") - SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE) + setArchiveTransferStateWithDelayedNotification(attachmentId, AttachmentTable.ArchiveTransferState.NONE) AppDependencies.jobManager.add(UploadAttachmentToArchiveJob(attachmentId, canReuseUpload = false)) Result.success() } @@ -197,7 +198,10 @@ class CopyAttachmentToArchiveJob private constructor(private val attachmentId: A if (result.isSuccess) { Log.d(TAG, "[$attachmentId] Updating archive transfer state to ${AttachmentTable.ArchiveTransferState.FINISHED}") - SignalDatabase.attachments.setArchiveTransferState(attachmentId, attachment.remoteKey!!, attachment.dataHash!!, AttachmentTable.ArchiveTransferState.FINISHED) + ArchiveDatabaseExecutor.runBlocking { + SignalDatabase.attachments.setArchiveTransferState(attachmentId, attachment.remoteKey!!, attachment.dataHash!!, AttachmentTable.ArchiveTransferState.FINISHED, notify = false) + ArchiveDatabaseExecutor.throttledNotifyAttachmentObservers() + } if (!isCanceled && !attachment.quote) { ArchiveThumbnailUploadJob.enqueueIfNecessary(attachmentId) @@ -221,15 +225,26 @@ class CopyAttachmentToArchiveJob private constructor(private val attachmentId: A override fun onFailure() { if (this.isCanceled) { Log.w(TAG, "[$attachmentId] Job was canceled, updating archive transfer state to ${AttachmentTable.ArchiveTransferState.COPY_PENDING}.") - SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.COPY_PENDING) + ArchiveDatabaseExecutor.runBlocking { + SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.COPY_PENDING) + } } else { Log.w(TAG, "[$attachmentId] Job failed, updating archive transfer state to ${AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE}.") - SignalDatabase.attachments.setArchiveTransferStateFailure(attachmentId, AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE) + ArchiveDatabaseExecutor.runBlocking { + SignalDatabase.attachments.setArchiveTransferStateFailure(attachmentId, AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE) + } } ArchiveUploadProgress.onAttachmentFinished(attachmentId) } + private fun setArchiveTransferStateWithDelayedNotification(attachmentId: AttachmentId, transferState: AttachmentTable.ArchiveTransferState) { + ArchiveDatabaseExecutor.runBlocking { + SignalDatabase.attachments.setArchiveTransferState(attachmentId, transferState, notify = false) + ArchiveDatabaseExecutor.throttledNotifyAttachmentObservers() + } + } + class Factory : Job.Factory { override fun create(parameters: Parameters, serializedData: ByteArray?): CopyAttachmentToArchiveJob { val jobData = CopyAttachmentToArchiveJobData.ADAPTER.decode(serializedData!!) diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/RestoreAttachmentJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/RestoreAttachmentJob.kt index 43d18bc299..5ae05b744a 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/RestoreAttachmentJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/RestoreAttachmentJob.kt @@ -15,8 +15,6 @@ import androidx.core.content.ContextCompat import org.greenrobot.eventbus.EventBus import org.signal.core.util.Base64.decodeBase64OrThrow import org.signal.core.util.PendingIntentFlags -import org.signal.core.util.ThreadUtil -import org.signal.core.util.concurrent.SignalExecutors import org.signal.core.util.isNotNullOrBlank import org.signal.core.util.logging.Log import org.signal.libsignal.protocol.InvalidMacException @@ -25,6 +23,7 @@ import org.thoughtcrime.securesms.R import org.thoughtcrime.securesms.attachments.AttachmentId import org.thoughtcrime.securesms.attachments.DatabaseAttachment import org.thoughtcrime.securesms.attachments.InvalidAttachmentException +import org.thoughtcrime.securesms.backup.v2.ArchiveDatabaseExecutor import org.thoughtcrime.securesms.backup.v2.ArchiveRestoreProgress import org.thoughtcrime.securesms.backup.v2.BackupRepository import org.thoughtcrime.securesms.backup.v2.createArchiveAttachmentPointer @@ -53,7 +52,6 @@ import org.thoughtcrime.securesms.stickers.StickerLocator import org.thoughtcrime.securesms.transport.RetryLaterException import org.thoughtcrime.securesms.util.RemoteConfig import org.thoughtcrime.securesms.util.SignalLocalMetrics -import org.thoughtcrime.securesms.util.ThrottledDebouncer import org.whispersystems.signalservice.api.crypto.AttachmentCipherInputStream.IntegrityCheck import org.whispersystems.signalservice.api.messages.AttachmentTransferProgress import org.whispersystems.signalservice.api.messages.SignalServiceAttachment @@ -63,9 +61,6 @@ import org.whispersystems.signalservice.api.push.exceptions.PushNetworkException import org.whispersystems.signalservice.api.push.exceptions.RangeException import java.io.File import java.io.IOException -import java.util.concurrent.ExecutionException -import java.util.concurrent.ExecutorService -import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import kotlin.jvm.optionals.getOrNull import kotlin.math.abs @@ -74,7 +69,6 @@ import kotlin.math.pow import kotlin.time.Duration.Companion.days import kotlin.time.Duration.Companion.hours import kotlin.time.Duration.Companion.milliseconds -import kotlin.time.Duration.Companion.seconds /** * Download attachment from locations as specified in their record. @@ -129,23 +123,6 @@ class RestoreAttachmentJob private constructor( const val KEY = "RestoreAttachmentJob" private val TAG = Log.tag(RestoreAttachmentJob::class.java) - /** - * During media restore, we tend to hammer the database from a lot of different threads at once. This can block writes for more urgent things, like message - * sends. To reduce the impact, we put all of our database writes on a single-thread executor. - */ - private val DB_EXECUTOR = Executors.newSingleThreadExecutor(SignalExecutors.NumberedThreadFactory("restore-db", ThreadUtil.PRIORITY_IMPORTANT_BACKGROUND_THREAD)) - - /** - * By default, downloading an attachment wants to notify a bunch of database observation listeners. This slams the observer so hard that other people using - * it will experience massive delays in notifications. To avoid this, we turn off notifications for downloads, and then use this notifier to push some - * out every so often. - */ - val DATABASE_OBSERVER_NOTIFIER = ThrottledDebouncer(5.seconds.inWholeMilliseconds) - val NOTIFY_DATABASE_OBSERVERS = { - AppDependencies.databaseObserver.notifyConversationListListeners() - AppDependencies.databaseObserver.notifyAttachmentUpdatedObservers() - } - /** * Create a restore job for the initial large batch of media on a fresh restore. * Will enqueue with some amount of parallelization with low job priority. @@ -230,7 +207,7 @@ class RestoreAttachmentJob private constructor( } override fun onAdded() { - DB_EXECUTOR.runBlocking { + ArchiveDatabaseExecutor.runBlocking { SignalDatabase.attachments.setRestoreTransferState(attachmentId, AttachmentTable.TRANSFER_RESTORE_IN_PROGRESS) } } @@ -287,9 +264,9 @@ class RestoreAttachmentJob private constructor( dataStream?.use { input -> Log.i(TAG, "[$attachmentId] Attachment is sticker, restoring from local storage") - DB_EXECUTOR.runBlocking { + ArchiveDatabaseExecutor.runBlocking { SignalDatabase.attachments.finalizeAttachmentAfterDownload(messageId, attachmentId, input, if (manual) System.currentTimeMillis().milliseconds else null, notify = false) - DATABASE_OBSERVER_NOTIFIER.publish(NOTIFY_DATABASE_OBSERVERS) + ArchiveDatabaseExecutor.throttledNotifyAttachmentAndChatListObservers() } return } @@ -316,7 +293,7 @@ class RestoreAttachmentJob private constructor( override fun onFailure() { if (isCanceled) { - DB_EXECUTOR.runBlocking { + ArchiveDatabaseExecutor.runBlocking { SignalDatabase.attachments.setTransferState(messageId, attachmentId, AttachmentTable.TRANSFER_RESTORE_OFFLOADED) } } else { @@ -359,7 +336,7 @@ class RestoreAttachmentJob private constructor( forceTransitTier: Boolean = false ) { val maxReceiveSize: Long = RemoteConfig.maxAttachmentReceiveSizeBytes - val attachmentFile: File = DB_EXECUTOR.runBlocking { + val attachmentFile: File = ArchiveDatabaseExecutor.runBlocking { SignalDatabase.attachments.getOrCreateTransferFile(attachmentId) } var useArchiveCdn = false @@ -435,11 +412,11 @@ class RestoreAttachmentJob private constructor( archiveRestore = true, notify = false ) - DATABASE_OBSERVER_NOTIFIER.publish(NOTIFY_DATABASE_OBSERVERS) + ArchiveDatabaseExecutor.throttledNotifyAttachmentAndChatListObservers() } if (useArchiveCdn && attachment.archiveCdn == null) { - DB_EXECUTOR.runBlocking { + ArchiveDatabaseExecutor.runBlocking { SignalDatabase.attachments.setArchiveCdn(attachmentId, pointer.cdnNumber) } } @@ -510,7 +487,7 @@ class RestoreAttachmentJob private constructor( } } catch (e: org.signal.libsignal.protocol.incrementalmac.InvalidMacException) { Log.w(TAG, "[$attachmentId] Detected an invalid incremental mac. Clearing and marking as a temporary failure, requiring the user to manually try again.") - DB_EXECUTOR.runBlocking { + ArchiveDatabaseExecutor.runBlocking { SignalDatabase.attachments.clearIncrementalMacsForAttachmentAndAnyDuplicates(attachmentId, attachment.remoteKey, attachment.dataHash) } markFailed(attachmentId) @@ -520,13 +497,13 @@ class RestoreAttachmentJob private constructor( } private fun markFailed(attachmentId: AttachmentId) { - DB_EXECUTOR.runBlocking { + ArchiveDatabaseExecutor.runBlocking { SignalDatabase.attachments.setRestoreTransferState(attachmentId, AttachmentTable.TRANSFER_PROGRESS_FAILED) } } private fun markPermanentlyFailed(attachmentId: AttachmentId) { - DB_EXECUTOR.runBlocking { + ArchiveDatabaseExecutor.runBlocking { SignalDatabase.attachments.setRestoreTransferState(attachmentId, AttachmentTable.TRANSFER_PROGRESS_PERMANENT_FAILURE) } } @@ -571,14 +548,6 @@ class RestoreAttachmentJob private constructor( NotificationManagerCompat.from(context).notify(NotificationIds.INTERNAL_ERROR, notification) } - private fun ExecutorService.runBlocking(block: () -> T): T { - return try { - this.submit(block).get() - } catch (e: ExecutionException) { - throw e.cause ?: e - } - } - class Factory : Job.Factory { override fun create(parameters: Parameters, serializedData: ByteArray?): RestoreAttachmentJob { val data = RestoreAttachmentJobData.ADAPTER.decode(serializedData!!) diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/RestoreAttachmentThumbnailJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/RestoreAttachmentThumbnailJob.kt index 7fbf8bc8a0..6e5e948210 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/RestoreAttachmentThumbnailJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/RestoreAttachmentThumbnailJob.kt @@ -8,6 +8,7 @@ import org.signal.core.util.logging.Log import org.signal.libsignal.protocol.InvalidMessageException import org.thoughtcrime.securesms.attachments.AttachmentId import org.thoughtcrime.securesms.attachments.InvalidAttachmentException +import org.thoughtcrime.securesms.backup.v2.ArchiveDatabaseExecutor import org.thoughtcrime.securesms.backup.v2.BackupRepository import org.thoughtcrime.securesms.backup.v2.createArchiveThumbnailPointer import org.thoughtcrime.securesms.backup.v2.requireThumbnailMediaName @@ -141,7 +142,9 @@ class RestoreAttachmentThumbnailJob private constructor( ) decryptingStream.use { input -> - SignalDatabase.attachments.finalizeAttachmentThumbnailAfterDownload(attachmentId, attachment.dataHash, attachment.remoteKey, input, thumbnailTransferFile) + ArchiveDatabaseExecutor.runBlocking { + SignalDatabase.attachments.finalizeAttachmentThumbnailAfterDownload(attachmentId, attachment.dataHash, attachment.remoteKey, input, thumbnailTransferFile) + } } if (!SignalDatabase.messages.isStory(messageId)) { @@ -152,7 +155,9 @@ class RestoreAttachmentThumbnailJob private constructor( override fun onFailure() { Log.w(TAG, format(this, "onFailure() thumbnail messageId: $messageId attachmentId: $attachmentId ")) - SignalDatabase.attachments.setThumbnailRestoreProgressFailed(attachmentId, messageId) + ArchiveDatabaseExecutor.runBlocking { + SignalDatabase.attachments.setThumbnailRestoreProgressFailed(attachmentId, messageId) + } } override fun onShouldRetry(exception: Exception): Boolean { diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt index e0d934469d..53ed5b54a7 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt @@ -16,6 +16,7 @@ import org.thoughtcrime.securesms.attachments.AttachmentId import org.thoughtcrime.securesms.attachments.AttachmentUploadUtil import org.thoughtcrime.securesms.attachments.DatabaseAttachment import org.thoughtcrime.securesms.backup.ArchiveUploadProgress +import org.thoughtcrime.securesms.backup.v2.ArchiveDatabaseExecutor import org.thoughtcrime.securesms.backup.v2.BackupRepository import org.thoughtcrime.securesms.database.AttachmentTable import org.thoughtcrime.securesms.database.SignalDatabase @@ -97,7 +98,9 @@ class UploadAttachmentToArchiveJob private constructor( if (transferStatus == AttachmentTable.ArchiveTransferState.NONE) { Log.d(TAG, "[$attachmentId] Updating archive transfer state to ${AttachmentTable.ArchiveTransferState.UPLOAD_IN_PROGRESS}") - SignalDatabase.attachments.setArchiveTransferStateUnlessPermanentFailure(attachmentId, AttachmentTable.ArchiveTransferState.UPLOAD_IN_PROGRESS) + ArchiveDatabaseExecutor.runBlocking { + SignalDatabase.attachments.setArchiveTransferStateUnlessPermanentFailure(attachmentId, AttachmentTable.ArchiveTransferState.UPLOAD_IN_PROGRESS) + } } } @@ -109,13 +112,17 @@ class UploadAttachmentToArchiveJob private constructor( if (SignalStore.account.isLinkedDevice) { Log.w(TAG, "[$attachmentId] Linked devices don't backup media. Skipping.") - SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE) + ArchiveDatabaseExecutor.runBlocking { + setArchiveTransferStateWithDelayedNotification(attachmentId, AttachmentTable.ArchiveTransferState.NONE) + } return Result.success() } if (!SignalStore.backup.backsUpMedia) { Log.w(TAG, "[$attachmentId] This user does not back up media. Skipping.") - SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE) + ArchiveDatabaseExecutor.runBlocking { + setArchiveTransferStateWithDelayedNotification(attachmentId, AttachmentTable.ArchiveTransferState.NONE) + } return Result.success() } @@ -149,25 +156,33 @@ class UploadAttachmentToArchiveJob private constructor( if (SignalDatabase.messages.isStory(attachment.mmsId)) { Log.i(TAG, "[$attachmentId] Attachment is a story. Resetting transfer state to none and skipping.") - SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE) + ArchiveDatabaseExecutor.runBlocking { + setArchiveTransferStateWithDelayedNotification(attachmentId, AttachmentTable.ArchiveTransferState.NONE) + } return Result.success() } if (SignalDatabase.messages.isViewOnce(attachment.mmsId)) { Log.i(TAG, "[$attachmentId] Attachment is a view-once. Resetting transfer state to none and skipping.") - SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE) + ArchiveDatabaseExecutor.runBlocking { + setArchiveTransferStateWithDelayedNotification(attachmentId, AttachmentTable.ArchiveTransferState.NONE) + } return Result.success() } if (SignalDatabase.messages.willMessageExpireBeforeCutoff(attachment.mmsId)) { Log.i(TAG, "[$attachmentId] Message will expire within 24 hours. Resetting transfer state to none and skipping.") - SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE) + ArchiveDatabaseExecutor.runBlocking { + setArchiveTransferStateWithDelayedNotification(attachmentId, AttachmentTable.ArchiveTransferState.NONE) + } return Result.success() } if (attachment.contentType == MediaUtil.LONG_TEXT) { Log.i(TAG, "[$attachmentId] Attachment is long text. Resetting transfer state to none and skipping.") - SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE) + ArchiveDatabaseExecutor.runBlocking { + setArchiveTransferStateWithDelayedNotification(attachmentId, AttachmentTable.ArchiveTransferState.NONE) + } return Result.success() } @@ -226,7 +241,9 @@ class UploadAttachmentToArchiveJob private constructor( ) } catch (e: FileNotFoundException) { Log.w(TAG, "[$attachmentId] No file exists for this attachment! Marking as a permanent failure.", e) - SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.PERMANENT_FAILURE) + ArchiveDatabaseExecutor.runBlocking { + setArchiveTransferStateWithDelayedNotification(attachmentId, AttachmentTable.ArchiveTransferState.PERMANENT_FAILURE) + } return Result.failure() } catch (e: IOException) { Log.w(TAG, "[$attachmentId] Failed while reading the stream.", e) @@ -249,7 +266,9 @@ class UploadAttachmentToArchiveJob private constructor( .use { it.readLength() } if (actualLength != attachment.size) { Log.w(TAG, "[$attachmentId] Length was incorrect! Will update. Previous: ${attachment.size}, Newly-Calculated: $actualLength", result.exception) - SignalDatabase.attachments.updateAttachmentLength(attachmentId, actualLength) + ArchiveDatabaseExecutor.runBlocking { + SignalDatabase.attachments.updateAttachmentLength(attachmentId, actualLength) + } } else { Log.i(TAG, "[$attachmentId] Length was correct. No action needed. Will retry.") } @@ -272,7 +291,9 @@ class UploadAttachmentToArchiveJob private constructor( } Log.d(TAG, "[$attachmentId] Upload complete!") - SignalDatabase.attachments.finalizeAttachmentAfterUpload(attachment.attachmentId, uploadResult) + ArchiveDatabaseExecutor.runBlocking { + SignalDatabase.attachments.finalizeAttachmentAfterUpload(attachment.attachmentId, uploadResult) + } } if (!isCanceled) { @@ -287,10 +308,14 @@ class UploadAttachmentToArchiveJob private constructor( override fun onFailure() { if (this.isCanceled) { Log.w(TAG, "[$attachmentId] Job was canceled, updating archive transfer state to ${AttachmentTable.ArchiveTransferState.NONE}.") - SignalDatabase.attachments.setArchiveTransferStateFailure(attachmentId, AttachmentTable.ArchiveTransferState.NONE) + ArchiveDatabaseExecutor.runBlocking { + SignalDatabase.attachments.setArchiveTransferStateFailure(attachmentId, AttachmentTable.ArchiveTransferState.NONE) + } } else { Log.w(TAG, "[$attachmentId] Job failed, updating archive transfer state to ${AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE} (if not already a permanent failure).") - SignalDatabase.attachments.setArchiveTransferStateFailure(attachmentId, AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE) + ArchiveDatabaseExecutor.runBlocking { + SignalDatabase.attachments.setArchiveTransferStateFailure(attachmentId, AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE) + } } } @@ -334,6 +359,13 @@ class UploadAttachmentToArchiveJob private constructor( } } + private fun setArchiveTransferStateWithDelayedNotification(attachmentId: AttachmentId, transferState: AttachmentTable.ArchiveTransferState) { + ArchiveDatabaseExecutor.runBlocking { + SignalDatabase.attachments.setArchiveTransferState(attachmentId, transferState, notify = false) + ArchiveDatabaseExecutor.throttledNotifyAttachmentObservers() + } + } + class Factory : Job.Factory { override fun create(parameters: Parameters, serializedData: ByteArray?): UploadAttachmentToArchiveJob { val data = UploadAttachmentToArchiveJobData.ADAPTER.decode(serializedData!!)