Use trailing job to clear media restore progress.

This commit is contained in:
Cody Henthorne
2024-09-05 14:18:04 -04:00
parent 390ea341ca
commit 2701b570bb
6 changed files with 92 additions and 14 deletions

View File

@@ -331,12 +331,17 @@ object BackupRepository {
fun localImport(mainStreamFactory: () -> InputStream, mainStreamLength: Long, selfData: SelfData): ImportResult {
val backupKey = SignalStore.svr.getOrCreateMasterKey().deriveBackupKey()
val frameReader = EncryptedBackupReader(
key = backupKey,
aci = selfData.aci,
length = mainStreamLength,
dataStream = mainStreamFactory
)
val frameReader = try {
EncryptedBackupReader(
key = backupKey,
aci = selfData.aci,
length = mainStreamLength,
dataStream = mainStreamFactory
)
} catch (e: IOException) {
Log.w(TAG, "Unable to import local archive", e)
return ImportResult.Failure
}
return frameReader.use { reader ->
import(backupKey, reader, selfData)

View File

@@ -79,10 +79,6 @@ class MediaRestoreProgressBanner(private val data: MediaRestoreEvent) : Banner()
val remainingAttachmentSize = SignalDatabase.attachments.getRemainingRestorableAttachmentSize()
val completedBytes = totalRestoreSize - remainingAttachmentSize
if (remainingAttachmentSize == 0L) {
SignalStore.backup.totalRestorableAttachmentSize = 0
}
MediaRestoreEvent(completedBytes, totalRestoreSize)
}
}

View File

@@ -0,0 +1,62 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.jobs
import org.signal.core.util.logging.Log
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.keyvalue.SignalStore
import kotlin.time.Duration.Companion.seconds
/**
* Intended to be enqueued after the various media restore jobs to check progress to completion. When this job
* runs it expects all media to be restored and will re-enqueue a new instance if not. Re-enqueue is likely to happen
* when one of the restore queues finishes before the other(s).
*/
class CheckRestoreMediaLeftJob private constructor(parameters: Parameters) : Job(parameters) {
companion object {
const val KEY = "CheckRestoreMediaLeftJob"
private val TAG = Log.tag(CheckRestoreMediaLeftJob::class)
}
constructor(queue: String) : this(
Parameters.Builder()
.setQueue(queue)
.setLifespan(Parameters.IMMORTAL)
.setMaxAttempts(2)
.build()
)
override fun getFactoryKey(): String = KEY
override fun serialize(): ByteArray? = null
override fun run(): Result {
val remainingAttachmentSize = SignalDatabase.attachments.getRemainingRestorableAttachmentSize()
if (remainingAttachmentSize == 0L) {
SignalStore.backup.totalRestorableAttachmentSize = 0
} else if (runAttempt == 0) {
Log.w(TAG, "Still have remaining data to restore, will retry before checking job queues, queue: ${parameters.queue} estimated remaining: $remainingAttachmentSize")
return Result.retry(15.seconds.inWholeMilliseconds)
} else {
Log.w(TAG, "Max retries reached, queue: ${parameters.queue} estimated remaining: $remainingAttachmentSize")
// todo [local-backup] inspect jobs/queues and raise some alarm/abort?
}
return Result.success()
}
override fun onFailure() = Unit
class Factory : Job.Factory<CheckRestoreMediaLeftJob?> {
override fun create(parameters: Parameters, serializedData: ByteArray?): CheckRestoreMediaLeftJob {
return CheckRestoreMediaLeftJob(parameters)
}
}
}

View File

@@ -127,6 +127,7 @@ public final class JobManagerFactories {
put(CallLinkUpdateSendJob.KEY, new CallLinkUpdateSendJob.Factory());
put(CallLogEventSendJob.KEY, new CallLogEventSendJob.Factory());
put(CallSyncEventJob.KEY, new CallSyncEventJob.Factory());
put(CheckRestoreMediaLeftJob.KEY, new CheckRestoreMediaLeftJob.Factory());
put(CheckServiceReachabilityJob.KEY, new CheckServiceReachabilityJob.Factory());
put(CleanPreKeysJob.KEY, new CleanPreKeysJob.Factory());
put(ContactLinkRebuildMigrationJob.KEY, new ContactLinkRebuildMigrationJob.Factory());

View File

@@ -25,7 +25,6 @@ import org.whispersystems.signalservice.api.backup.MediaName
import org.whispersystems.signalservice.api.crypto.AttachmentCipherInputStream
import org.whispersystems.signalservice.api.crypto.AttachmentCipherInputStream.StreamSupplier
import java.io.IOException
import java.util.concurrent.TimeUnit
/**
* Restore attachment from local backup storage.
@@ -41,6 +40,7 @@ class RestoreLocalAttachmentJob private constructor(
companion object {
const val KEY = "RestoreLocalAttachmentJob"
val TAG = Log.tag(RestoreLocalAttachmentJob::class.java)
private const val CONCURRENT_QUEUES = 2
fun enqueueRestoreLocalAttachmentsJobs(mediaNameToFileInfo: Map<String, DocumentFileInfo>) {
var restoreAttachmentJobs: MutableList<Job>
@@ -63,7 +63,7 @@ class RestoreLocalAttachmentJob private constructor(
if (fileInfo != null) {
restorableAttachments += attachment
restoreAttachmentJobs += RestoreLocalAttachmentJob("RestoreLocalAttachmentJob_${index % 2}", attachment, fileInfo)
restoreAttachmentJobs += RestoreLocalAttachmentJob(queueName(index), attachment, fileInfo)
} else {
notRestorableAttachments += attachment
}
@@ -77,14 +77,25 @@ class RestoreLocalAttachmentJob private constructor(
AppDependencies.jobManager.addAll(restoreAttachmentJobs)
}
} while (restoreAttachmentJobs.isNotEmpty())
val checkDoneJobs = (0 until CONCURRENT_QUEUES)
.map {
CheckRestoreMediaLeftJob(queueName(it))
}
AppDependencies.jobManager.addAll(checkDoneJobs)
}
private fun queueName(index: Int): String {
return "RestoreLocalAttachmentJob_${index % CONCURRENT_QUEUES}"
}
}
private constructor(queue: String, attachment: LocalRestorableAttachment, info: DocumentFileInfo) : this(
Parameters.Builder()
.setQueue(queue)
.setLifespan(TimeUnit.DAYS.toMillis(1))
.setMaxAttempts(Parameters.UNLIMITED)
.setLifespan(Parameters.IMMORTAL)
.setMaxAttempts(3)
.build(),
attachmentId = attachment.attachmentId,
messageId = attachment.mmsId,