From a549fff6fa6a6c6c077d2c2dfd0d531212d60a57 Mon Sep 17 00:00:00 2001 From: Greyson Parrelli Date: Thu, 7 Aug 2025 16:38:23 -0400 Subject: [PATCH] Add parallelization options for archive attachment restoration. --- .../securesms/backup/v2/BackupRepository.kt | 9 +-- .../securesms/jobs/AttachmentDownloadJob.kt | 8 +-- .../securesms/jobs/BackupRestoreMediaJob.kt | 4 +- .../jobs/CheckRestoreMediaLeftJob.kt | 1 + .../securesms/jobs/OptimizeMediaJob.kt | 4 +- .../securesms/jobs/RestoreAttachmentJob.kt | 56 +++++++++++++------ .../jobs/RestoreOptimizedMediaJob.kt | 4 +- 7 files changed, 52 insertions(+), 34 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/backup/v2/BackupRepository.kt b/app/src/main/java/org/thoughtcrime/securesms/backup/v2/BackupRepository.kt index c407db29da..b9c508a1a5 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/backup/v2/BackupRepository.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/backup/v2/BackupRepository.kt @@ -103,7 +103,6 @@ import org.thoughtcrime.securesms.jobs.AvatarGroupsV2DownloadJob import org.thoughtcrime.securesms.jobs.BackupDeleteJob import org.thoughtcrime.securesms.jobs.BackupMessagesJob import org.thoughtcrime.securesms.jobs.BackupRestoreMediaJob -import org.thoughtcrime.securesms.jobs.CheckRestoreMediaLeftJob import org.thoughtcrime.securesms.jobs.CreateReleaseChannelJob import org.thoughtcrime.securesms.jobs.LocalBackupJob import org.thoughtcrime.securesms.jobs.RequestGroupV2InfoJob @@ -359,13 +358,7 @@ object BackupRepository { fun skipMediaRestore() { SignalStore.backup.userManuallySkippedMediaRestore = true - AppDependencies.jobManager.cancelAllInQueue(RestoreAttachmentJob.constructQueueString(RestoreAttachmentJob.RestoreOperation.RESTORE_OFFLOADED)) - AppDependencies.jobManager.cancelAllInQueue(RestoreAttachmentJob.constructQueueString(RestoreAttachmentJob.RestoreOperation.INITIAL_RESTORE)) - AppDependencies.jobManager.cancelAllInQueue(RestoreAttachmentJob.constructQueueString(RestoreAttachmentJob.RestoreOperation.MANUAL)) - - AppDependencies.jobManager.add(CheckRestoreMediaLeftJob(RestoreAttachmentJob.constructQueueString(RestoreAttachmentJob.RestoreOperation.RESTORE_OFFLOADED))) - AppDependencies.jobManager.add(CheckRestoreMediaLeftJob(RestoreAttachmentJob.constructQueueString(RestoreAttachmentJob.RestoreOperation.INITIAL_RESTORE))) - AppDependencies.jobManager.add(CheckRestoreMediaLeftJob(RestoreAttachmentJob.constructQueueString(RestoreAttachmentJob.RestoreOperation.MANUAL))) + RestoreAttachmentJob.Queues.ALL.forEach { AppDependencies.jobManager.cancelAllInQueue(it) } } fun markBackupFailure() { diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentDownloadJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentDownloadJob.kt index 62a8433696..c17f791f4a 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentDownloadJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentDownloadJob.kt @@ -91,14 +91,14 @@ class AttachmentDownloadJob private constructor( AttachmentTable.TRANSFER_RESTORE_IN_PROGRESS, AttachmentTable.TRANSFER_RESTORE_OFFLOADED, - AttachmentTable.TRANSFER_NEEDS_RESTORE -> RestoreAttachmentJob.restoreAttachment(databaseAttachment) + AttachmentTable.TRANSFER_NEEDS_RESTORE -> RestoreAttachmentJob.forManualRestore(databaseAttachment) AttachmentTable.TRANSFER_PROGRESS_PENDING, AttachmentTable.TRANSFER_PROGRESS_FAILED -> { if (SignalStore.backup.backsUpMedia && (databaseAttachment.remoteLocation == null || databaseAttachment.remoteDigest == null)) { if (databaseAttachment.archiveTransferState == AttachmentTable.ArchiveTransferState.FINISHED) { Log.i(TAG, "Trying to restore attachment from archive cdn") - RestoreAttachmentJob.restoreAttachment(databaseAttachment) + RestoreAttachmentJob.forManualRestore(databaseAttachment) } else { Log.w(TAG, "No remote location, and the archive transfer state is unfinished. Can't download.") null @@ -206,7 +206,7 @@ class AttachmentDownloadJob private constructor( } Log.i(TAG, "Trying to restore attachment from archive cdn instead") - RestoreAttachmentJob.restoreAttachment(attachment) + RestoreAttachmentJob.forManualRestore(attachment) return } @@ -317,7 +317,7 @@ class AttachmentDownloadJob private constructor( } catch (e: NonSuccessfulResponseCodeException) { if (SignalStore.backup.backsUpMedia && e.code == 404 && attachment.archiveTransferState === AttachmentTable.ArchiveTransferState.FINISHED) { Log.i(TAG, "Retrying download from archive CDN") - RestoreAttachmentJob.restoreAttachment(attachment) + RestoreAttachmentJob.forManualRestore(attachment) return } diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupRestoreMediaJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupRestoreMediaJob.kt index b7763f8b8e..7bf68b8053 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupRestoreMediaJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupRestoreMediaJob.kt @@ -116,7 +116,9 @@ class BackupRestoreMediaJob private constructor(parameters: Parameters) : BaseJo SignalStore.backup.totalRestorableAttachmentSize = SignalDatabase.attachments.getRemainingRestorableAttachmentSize() - jobManager.add(CheckRestoreMediaLeftJob(RestoreAttachmentJob.constructQueueString(RestoreAttachmentJob.RestoreOperation.INITIAL_RESTORE))) + RestoreAttachmentJob.Queues.INITIAL_RESTORE.forEach { queue -> + jobManager.add(CheckRestoreMediaLeftJob(queue)) + } } private fun shouldRestoreFullSize(message: MmsMessageRecord, restoreTime: Long, optimizeStorage: Boolean): Boolean { diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/CheckRestoreMediaLeftJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/CheckRestoreMediaLeftJob.kt index 0dc6596412..b46636e36c 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/CheckRestoreMediaLeftJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/CheckRestoreMediaLeftJob.kt @@ -30,6 +30,7 @@ class CheckRestoreMediaLeftJob private constructor(parameters: Parameters) : Job Parameters.Builder() .setQueue(queue) .setLifespan(Parameters.IMMORTAL) + .setGlobalPriority(Parameters.PRIORITY_LOW) .setMaxAttempts(2) .build() ) diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/OptimizeMediaJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/OptimizeMediaJob.kt index e43ff20bd2..961148fb0f 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/OptimizeMediaJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/OptimizeMediaJob.kt @@ -47,8 +47,8 @@ class OptimizeMediaJob private constructor(parameters: Parameters) : Job(paramet } Log.i(TAG, "Canceling any previous restore optimized media jobs and cleanup progress") - AppDependencies.jobManager.cancelAllInQueue(RestoreAttachmentJob.constructQueueString(RestoreAttachmentJob.RestoreOperation.RESTORE_OFFLOADED)) - AppDependencies.jobManager.add(CheckRestoreMediaLeftJob(RestoreAttachmentJob.constructQueueString(RestoreAttachmentJob.RestoreOperation.RESTORE_OFFLOADED))) + AppDependencies.jobManager.cancelAllInQueues(RestoreAttachmentJob.Queues.OFFLOAD_RESTORE) + RestoreAttachmentJob.Queues.OFFLOAD_RESTORE.forEach { queue -> AppDependencies.jobManager.add(CheckRestoreMediaLeftJob(queue)) } Log.i(TAG, "Optimizing media in the db") SignalDatabase.attachments.markEligibleAttachmentsAsOptimized() diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/RestoreAttachmentJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/RestoreAttachmentJob.kt index e428321330..df9b2879f1 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/RestoreAttachmentJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/RestoreAttachmentJob.kt @@ -68,19 +68,48 @@ class RestoreAttachmentJob private constructor( private val manual: Boolean ) : BaseJob(parameters) { + object Queues { + /** Job queues used for the initial attachment restore post-registration. The number of queues in this set determine the level of parallelization. */ + val INITIAL_RESTORE = setOf( + "RestoreAttachmentJob::InitialRestore_01", + "RestoreAttachmentJob::InitialRestore_02", + "RestoreAttachmentJob::InitialRestore_03", + "RestoreAttachmentJob::InitialRestore_04" + ) + + /** Job queues used when restoring an offloaded attachment. The number of queues in this set determine the level of parallelization. */ + val OFFLOAD_RESTORE = setOf( + "RestoreAttachmentJob::OffloadRestore_01", + "RestoreAttachmentJob::OffloadRestore_02", + "RestoreAttachmentJob::OffloadRestore_03", + "RestoreAttachmentJob::OffloadRestore_04" + ) + + /** Job queues used for manual restoration. The number of queues in this set determine the level of parallelization. */ + val MANUAL_RESTORE = setOf( + "RestoreAttachmentJob::ManualRestore_01", + "RestoreAttachmentJob::ManualRestore_02" + ) + + /** All possible queues used by this job. */ + val ALL = INITIAL_RESTORE + OFFLOAD_RESTORE + MANUAL_RESTORE + } + companion object { const val KEY = "RestoreAttachmentJob" private val TAG = Log.tag(RestoreAttachmentJob::class.java) /** - * Create a restore job for the initial large batch of media on a fresh restore + * Create a restore job for the initial large batch of media on a fresh restore. + * Will enqueue with some amount of parallization with low job priority. */ fun forInitialRestore(attachmentId: AttachmentId, messageId: Long): RestoreAttachmentJob { return RestoreAttachmentJob( attachmentId = attachmentId, messageId = messageId, manual = false, - queue = constructQueueString(RestoreOperation.INITIAL_RESTORE) + queue = Queues.INITIAL_RESTORE.random(), + priority = Parameters.PRIORITY_LOW ) } @@ -94,7 +123,8 @@ class RestoreAttachmentJob private constructor( attachmentId = attachmentId, messageId = messageId, manual = false, - queue = constructQueueString(RestoreOperation.RESTORE_OFFLOADED) + queue = Queues.INITIAL_RESTORE.random(), + priority = Parameters.PRIORITY_LOW ) } @@ -104,28 +134,21 @@ class RestoreAttachmentJob private constructor( * @return job id of the restore */ @JvmStatic - fun restoreAttachment(attachment: DatabaseAttachment): String { + fun forManualRestore(attachment: DatabaseAttachment): String { val restoreJob = RestoreAttachmentJob( messageId = attachment.mmsId, attachmentId = attachment.attachmentId, manual = true, - queue = constructQueueString(RestoreOperation.MANUAL) + queue = Queues.MANUAL_RESTORE.random(), + priority = Parameters.PRIORITY_DEFAULT ) AppDependencies.jobManager.add(restoreJob) return restoreJob.id } - - /** - * There are three modes of restore and we use separate queues for each to facilitate canceling if necessary. - */ - @JvmStatic - fun constructQueueString(restoreOperation: RestoreOperation): String { - return "RestoreAttachmentJob::${restoreOperation.name}" - } } - private constructor(messageId: Long, attachmentId: AttachmentId, manual: Boolean, queue: String) : this( + private constructor(messageId: Long, attachmentId: AttachmentId, manual: Boolean, queue: String, priority: Int) : this( Parameters.Builder() .setQueue(queue) .apply { @@ -137,6 +160,7 @@ class RestoreAttachmentJob private constructor( } } .setLifespan(TimeUnit.DAYS.toMillis(30)) + .setGlobalPriority(priority) .build(), messageId, attachmentId, @@ -409,8 +433,4 @@ class RestoreAttachmentJob private constructor( ) } } - - enum class RestoreOperation { - MANUAL, RESTORE_OFFLOADED, INITIAL_RESTORE - } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/RestoreOptimizedMediaJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/RestoreOptimizedMediaJob.kt index 2b72ab75d4..1335eceeba 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/RestoreOptimizedMediaJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/RestoreOptimizedMediaJob.kt @@ -74,7 +74,9 @@ class RestoreOptimizedMediaJob private constructor(parameters: Parameters) : Job SignalStore.backup.totalRestorableAttachmentSize = SignalDatabase.attachments.getRemainingRestorableAttachmentSize() - AppDependencies.jobManager.add(CheckRestoreMediaLeftJob(RestoreAttachmentJob.constructQueueString(RestoreAttachmentJob.RestoreOperation.RESTORE_OFFLOADED))) + RestoreAttachmentJob.Queues.OFFLOAD_RESTORE.forEach { queue -> + AppDependencies.jobManager.add(CheckRestoreMediaLeftJob(queue)) + } return Result.success() }