Add parallelization options for archive attachment restoration.

This commit is contained in:
Greyson Parrelli
2025-08-07 16:38:23 -04:00
parent 7b88b07170
commit a549fff6fa
7 changed files with 52 additions and 34 deletions

View File

@@ -103,7 +103,6 @@ import org.thoughtcrime.securesms.jobs.AvatarGroupsV2DownloadJob
import org.thoughtcrime.securesms.jobs.BackupDeleteJob
import org.thoughtcrime.securesms.jobs.BackupMessagesJob
import org.thoughtcrime.securesms.jobs.BackupRestoreMediaJob
import org.thoughtcrime.securesms.jobs.CheckRestoreMediaLeftJob
import org.thoughtcrime.securesms.jobs.CreateReleaseChannelJob
import org.thoughtcrime.securesms.jobs.LocalBackupJob
import org.thoughtcrime.securesms.jobs.RequestGroupV2InfoJob
@@ -359,13 +358,7 @@ object BackupRepository {
fun skipMediaRestore() {
SignalStore.backup.userManuallySkippedMediaRestore = true
AppDependencies.jobManager.cancelAllInQueue(RestoreAttachmentJob.constructQueueString(RestoreAttachmentJob.RestoreOperation.RESTORE_OFFLOADED))
AppDependencies.jobManager.cancelAllInQueue(RestoreAttachmentJob.constructQueueString(RestoreAttachmentJob.RestoreOperation.INITIAL_RESTORE))
AppDependencies.jobManager.cancelAllInQueue(RestoreAttachmentJob.constructQueueString(RestoreAttachmentJob.RestoreOperation.MANUAL))
AppDependencies.jobManager.add(CheckRestoreMediaLeftJob(RestoreAttachmentJob.constructQueueString(RestoreAttachmentJob.RestoreOperation.RESTORE_OFFLOADED)))
AppDependencies.jobManager.add(CheckRestoreMediaLeftJob(RestoreAttachmentJob.constructQueueString(RestoreAttachmentJob.RestoreOperation.INITIAL_RESTORE)))
AppDependencies.jobManager.add(CheckRestoreMediaLeftJob(RestoreAttachmentJob.constructQueueString(RestoreAttachmentJob.RestoreOperation.MANUAL)))
RestoreAttachmentJob.Queues.ALL.forEach { AppDependencies.jobManager.cancelAllInQueue(it) }
}
fun markBackupFailure() {

View File

@@ -91,14 +91,14 @@ class AttachmentDownloadJob private constructor(
AttachmentTable.TRANSFER_RESTORE_IN_PROGRESS,
AttachmentTable.TRANSFER_RESTORE_OFFLOADED,
AttachmentTable.TRANSFER_NEEDS_RESTORE -> RestoreAttachmentJob.restoreAttachment(databaseAttachment)
AttachmentTable.TRANSFER_NEEDS_RESTORE -> RestoreAttachmentJob.forManualRestore(databaseAttachment)
AttachmentTable.TRANSFER_PROGRESS_PENDING,
AttachmentTable.TRANSFER_PROGRESS_FAILED -> {
if (SignalStore.backup.backsUpMedia && (databaseAttachment.remoteLocation == null || databaseAttachment.remoteDigest == null)) {
if (databaseAttachment.archiveTransferState == AttachmentTable.ArchiveTransferState.FINISHED) {
Log.i(TAG, "Trying to restore attachment from archive cdn")
RestoreAttachmentJob.restoreAttachment(databaseAttachment)
RestoreAttachmentJob.forManualRestore(databaseAttachment)
} else {
Log.w(TAG, "No remote location, and the archive transfer state is unfinished. Can't download.")
null
@@ -206,7 +206,7 @@ class AttachmentDownloadJob private constructor(
}
Log.i(TAG, "Trying to restore attachment from archive cdn instead")
RestoreAttachmentJob.restoreAttachment(attachment)
RestoreAttachmentJob.forManualRestore(attachment)
return
}
@@ -317,7 +317,7 @@ class AttachmentDownloadJob private constructor(
} catch (e: NonSuccessfulResponseCodeException) {
if (SignalStore.backup.backsUpMedia && e.code == 404 && attachment.archiveTransferState === AttachmentTable.ArchiveTransferState.FINISHED) {
Log.i(TAG, "Retrying download from archive CDN")
RestoreAttachmentJob.restoreAttachment(attachment)
RestoreAttachmentJob.forManualRestore(attachment)
return
}

View File

@@ -116,7 +116,9 @@ class BackupRestoreMediaJob private constructor(parameters: Parameters) : BaseJo
SignalStore.backup.totalRestorableAttachmentSize = SignalDatabase.attachments.getRemainingRestorableAttachmentSize()
jobManager.add(CheckRestoreMediaLeftJob(RestoreAttachmentJob.constructQueueString(RestoreAttachmentJob.RestoreOperation.INITIAL_RESTORE)))
RestoreAttachmentJob.Queues.INITIAL_RESTORE.forEach { queue ->
jobManager.add(CheckRestoreMediaLeftJob(queue))
}
}
private fun shouldRestoreFullSize(message: MmsMessageRecord, restoreTime: Long, optimizeStorage: Boolean): Boolean {

View File

@@ -30,6 +30,7 @@ class CheckRestoreMediaLeftJob private constructor(parameters: Parameters) : Job
Parameters.Builder()
.setQueue(queue)
.setLifespan(Parameters.IMMORTAL)
.setGlobalPriority(Parameters.PRIORITY_LOW)
.setMaxAttempts(2)
.build()
)

View File

@@ -47,8 +47,8 @@ class OptimizeMediaJob private constructor(parameters: Parameters) : Job(paramet
}
Log.i(TAG, "Canceling any previous restore optimized media jobs and cleanup progress")
AppDependencies.jobManager.cancelAllInQueue(RestoreAttachmentJob.constructQueueString(RestoreAttachmentJob.RestoreOperation.RESTORE_OFFLOADED))
AppDependencies.jobManager.add(CheckRestoreMediaLeftJob(RestoreAttachmentJob.constructQueueString(RestoreAttachmentJob.RestoreOperation.RESTORE_OFFLOADED)))
AppDependencies.jobManager.cancelAllInQueues(RestoreAttachmentJob.Queues.OFFLOAD_RESTORE)
RestoreAttachmentJob.Queues.OFFLOAD_RESTORE.forEach { queue -> AppDependencies.jobManager.add(CheckRestoreMediaLeftJob(queue)) }
Log.i(TAG, "Optimizing media in the db")
SignalDatabase.attachments.markEligibleAttachmentsAsOptimized()

View File

@@ -68,19 +68,48 @@ class RestoreAttachmentJob private constructor(
private val manual: Boolean
) : BaseJob(parameters) {
object Queues {
/** Job queues used for the initial attachment restore post-registration. The number of queues in this set determine the level of parallelization. */
val INITIAL_RESTORE = setOf(
"RestoreAttachmentJob::InitialRestore_01",
"RestoreAttachmentJob::InitialRestore_02",
"RestoreAttachmentJob::InitialRestore_03",
"RestoreAttachmentJob::InitialRestore_04"
)
/** Job queues used when restoring an offloaded attachment. The number of queues in this set determine the level of parallelization. */
val OFFLOAD_RESTORE = setOf(
"RestoreAttachmentJob::OffloadRestore_01",
"RestoreAttachmentJob::OffloadRestore_02",
"RestoreAttachmentJob::OffloadRestore_03",
"RestoreAttachmentJob::OffloadRestore_04"
)
/** Job queues used for manual restoration. The number of queues in this set determine the level of parallelization. */
val MANUAL_RESTORE = setOf(
"RestoreAttachmentJob::ManualRestore_01",
"RestoreAttachmentJob::ManualRestore_02"
)
/** All possible queues used by this job. */
val ALL = INITIAL_RESTORE + OFFLOAD_RESTORE + MANUAL_RESTORE
}
companion object {
const val KEY = "RestoreAttachmentJob"
private val TAG = Log.tag(RestoreAttachmentJob::class.java)
/**
* Create a restore job for the initial large batch of media on a fresh restore
* Create a restore job for the initial large batch of media on a fresh restore.
* Will enqueue with some amount of parallization with low job priority.
*/
fun forInitialRestore(attachmentId: AttachmentId, messageId: Long): RestoreAttachmentJob {
return RestoreAttachmentJob(
attachmentId = attachmentId,
messageId = messageId,
manual = false,
queue = constructQueueString(RestoreOperation.INITIAL_RESTORE)
queue = Queues.INITIAL_RESTORE.random(),
priority = Parameters.PRIORITY_LOW
)
}
@@ -94,7 +123,8 @@ class RestoreAttachmentJob private constructor(
attachmentId = attachmentId,
messageId = messageId,
manual = false,
queue = constructQueueString(RestoreOperation.RESTORE_OFFLOADED)
queue = Queues.INITIAL_RESTORE.random(),
priority = Parameters.PRIORITY_LOW
)
}
@@ -104,28 +134,21 @@ class RestoreAttachmentJob private constructor(
* @return job id of the restore
*/
@JvmStatic
fun restoreAttachment(attachment: DatabaseAttachment): String {
fun forManualRestore(attachment: DatabaseAttachment): String {
val restoreJob = RestoreAttachmentJob(
messageId = attachment.mmsId,
attachmentId = attachment.attachmentId,
manual = true,
queue = constructQueueString(RestoreOperation.MANUAL)
queue = Queues.MANUAL_RESTORE.random(),
priority = Parameters.PRIORITY_DEFAULT
)
AppDependencies.jobManager.add(restoreJob)
return restoreJob.id
}
/**
* There are three modes of restore and we use separate queues for each to facilitate canceling if necessary.
*/
@JvmStatic
fun constructQueueString(restoreOperation: RestoreOperation): String {
return "RestoreAttachmentJob::${restoreOperation.name}"
}
}
private constructor(messageId: Long, attachmentId: AttachmentId, manual: Boolean, queue: String) : this(
private constructor(messageId: Long, attachmentId: AttachmentId, manual: Boolean, queue: String, priority: Int) : this(
Parameters.Builder()
.setQueue(queue)
.apply {
@@ -137,6 +160,7 @@ class RestoreAttachmentJob private constructor(
}
}
.setLifespan(TimeUnit.DAYS.toMillis(30))
.setGlobalPriority(priority)
.build(),
messageId,
attachmentId,
@@ -409,8 +433,4 @@ class RestoreAttachmentJob private constructor(
)
}
}
enum class RestoreOperation {
MANUAL, RESTORE_OFFLOADED, INITIAL_RESTORE
}
}

View File

@@ -74,7 +74,9 @@ class RestoreOptimizedMediaJob private constructor(parameters: Parameters) : Job
SignalStore.backup.totalRestorableAttachmentSize = SignalDatabase.attachments.getRemainingRestorableAttachmentSize()
AppDependencies.jobManager.add(CheckRestoreMediaLeftJob(RestoreAttachmentJob.constructQueueString(RestoreAttachmentJob.RestoreOperation.RESTORE_OFFLOADED)))
RestoreAttachmentJob.Queues.OFFLOAD_RESTORE.forEach { queue ->
AppDependencies.jobManager.add(CheckRestoreMediaLeftJob(queue))
}
return Result.success()
}