Add optimize storage infrastructure for backupsv2.

This commit is contained in:
Cody Henthorne
2024-09-20 16:47:18 -04:00
committed by GitHub
parent 7935d12675
commit a10958ee13
16 changed files with 391 additions and 94 deletions

View File

@@ -4,6 +4,7 @@
*/
package org.thoughtcrime.securesms.jobs
import androidx.annotation.MainThread
import okio.Source
import okio.buffer
import org.greenrobot.eventbus.EventBus
@@ -82,6 +83,7 @@ class AttachmentDownloadJob private constructor(
}
@JvmStatic
@MainThread
fun downloadAttachmentIfNeeded(databaseAttachment: DatabaseAttachment): String? {
return when (val transferState = databaseAttachment.transferState) {
AttachmentTable.TRANSFER_PROGRESS_DONE -> null

View File

@@ -37,14 +37,18 @@ class BackupMessagesJob private constructor(parameters: Parameters) : Job(parame
*/
fun enqueue(pruneAbandonedRemoteMedia: Boolean = false) {
val jobManager = AppDependencies.jobManager
val chain = jobManager.startChain(BackupMessagesJob())
if (pruneAbandonedRemoteMedia) {
jobManager
.startChain(BackupMessagesJob())
.then(SyncArchivedMediaJob())
.enqueue()
} else {
jobManager.add(BackupMessagesJob())
chain.then(SyncArchivedMediaJob())
}
if (SignalStore.backup.optimizeStorage && SignalStore.backup.backsUpMedia) {
chain.then(OptimizeMediaJob())
}
chain.enqueue()
}
}
@@ -53,7 +57,7 @@ class BackupMessagesJob private constructor(parameters: Parameters) : Job(parame
.addConstraint(if (SignalStore.backup.backupWithCellular) NetworkConstraint.KEY else WifiConstraint.KEY)
.setMaxAttempts(3)
.setMaxInstancesForFactory(1)
.setQueue(BackfillDigestJob.QUEUE) // We want to ensure digests have been backfilled before this runs. Could eventually remove this constraint.b
.setQueue(BackfillDigestJob.QUEUE) // We want to ensure digests have been backfilled before this runs. Could eventually remove this constraint.
.build()
)

View File

@@ -7,8 +7,8 @@ package org.thoughtcrime.securesms.jobs
import org.signal.core.util.logging.Log
import org.signal.core.util.withinTransaction
import org.thoughtcrime.securesms.attachments.AttachmentId
import org.thoughtcrime.securesms.database.AttachmentTable
import org.thoughtcrime.securesms.database.AttachmentTable.RestorableAttachment
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.database.model.MmsMessageRecord
import org.thoughtcrime.securesms.dependencies.AppDependencies
@@ -56,10 +56,10 @@ class BackupRestoreMediaJob private constructor(parameters: Parameters) : BaseJo
do {
val restoreThumbnailJobs: MutableList<RestoreAttachmentThumbnailJob> = mutableListOf()
val restoreFullAttachmentJobs: MutableMap<RestorableAttachment, RestoreAttachmentJob> = mutableMapOf()
val restoreFullAttachmentJobs: MutableList<RestoreAttachmentJob> = mutableListOf()
val restoreThumbnailOnlyAttachments: MutableList<RestorableAttachment> = mutableListOf()
val notRestorable: MutableList<RestorableAttachment> = mutableListOf()
val restoreThumbnailOnlyAttachmentsIds: MutableList<AttachmentId> = mutableListOf()
val notRestorable: MutableList<AttachmentId> = mutableListOf()
val attachmentBatch = SignalDatabase.attachments.getRestorableAttachments(batchSize)
val messageIds = attachmentBatch.map { it.mmsId }.toSet()
@@ -71,7 +71,7 @@ class BackupRestoreMediaJob private constructor(parameters: Parameters) : BaseJo
val message = messageMap[attachment.mmsId]
if (message == null && !isWallpaper) {
Log.w(TAG, "Unable to find message for ${attachment.attachmentId}, mmsId: ${attachment.mmsId}")
notRestorable += attachment
notRestorable += attachment.attachmentId
continue
}
@@ -82,38 +82,36 @@ class BackupRestoreMediaJob private constructor(parameters: Parameters) : BaseJo
)
if (isWallpaper || shouldRestoreFullSize(message!!, restoreTime, SignalStore.backup.optimizeStorage)) {
restoreFullAttachmentJobs += attachment to RestoreAttachmentJob(
restoreFullAttachmentJobs += RestoreAttachmentJob.forInitialRestore(
messageId = attachment.mmsId,
attachmentId = attachment.attachmentId
)
} else {
restoreThumbnailOnlyAttachments += attachment
restoreThumbnailOnlyAttachmentsIds += attachment.attachmentId
}
}
SignalDatabase.rawDatabase.withinTransaction {
// Mark not restorable thumbnails and attachments as failed
SignalDatabase.attachments.setThumbnailRestoreState(notRestorable.map { it.attachmentId }, AttachmentTable.ThumbnailRestoreState.PERMANENT_FAILURE)
SignalDatabase.attachments.setThumbnailRestoreState(notRestorable, AttachmentTable.ThumbnailRestoreState.PERMANENT_FAILURE)
SignalDatabase.attachments.setRestoreTransferState(notRestorable, AttachmentTable.TRANSFER_PROGRESS_FAILED)
// Mark restorable thumbnails and attachments as in progress
SignalDatabase.attachments.setThumbnailRestoreState(restoreThumbnailJobs.map { it.attachmentId }, AttachmentTable.ThumbnailRestoreState.IN_PROGRESS)
SignalDatabase.attachments.setRestoreTransferState(restoreFullAttachmentJobs.keys, AttachmentTable.TRANSFER_RESTORE_IN_PROGRESS)
// Set thumbnail only attachments as offloaded
SignalDatabase.attachments.setRestoreTransferState(restoreThumbnailOnlyAttachments, AttachmentTable.TRANSFER_RESTORE_OFFLOADED)
jobManager.addAll(restoreThumbnailJobs + restoreFullAttachmentJobs.values)
SignalDatabase.attachments.setRestoreTransferState(restoreThumbnailOnlyAttachmentsIds, AttachmentTable.TRANSFER_RESTORE_OFFLOADED)
}
// Intentionally enqueues one at a time for safer attachment transfer state management
restoreThumbnailJobs.forEach { jobManager.add(it) }
restoreFullAttachmentJobs.forEach { jobManager.add(it) }
} while (restoreThumbnailJobs.isNotEmpty() && restoreFullAttachmentJobs.isNotEmpty() && notRestorable.isNotEmpty())
SignalStore.backup.totalRestorableAttachmentSize = SignalDatabase.attachments.getRemainingRestorableAttachmentSize()
jobManager.add(CheckRestoreMediaLeftJob(RestoreAttachmentJob.constructQueueString()))
jobManager.add(CheckRestoreMediaLeftJob(RestoreAttachmentJob.constructQueueString(RestoreAttachmentJob.RestoreOperation.INITIAL_RESTORE)))
}
private fun shouldRestoreFullSize(message: MmsMessageRecord, restoreTime: Long, optimizeStorage: Boolean): Boolean {
return !optimizeStorage || ((restoreTime - message.dateSent) < 30.days.inWholeMilliseconds)
return !optimizeStorage || ((restoreTime - message.dateReceived) < 30.days.inWholeMilliseconds)
}
override fun onShouldRetry(e: Exception): Boolean = false

View File

@@ -188,6 +188,7 @@ public final class JobManagerFactories {
put(MultiDeviceViewOnceOpenJob.KEY, new MultiDeviceViewOnceOpenJob.Factory());
put(MultiDeviceViewedUpdateJob.KEY, new MultiDeviceViewedUpdateJob.Factory());
put(NullMessageSendJob.KEY, new NullMessageSendJob.Factory());
put(OptimizeMediaJob.KEY, new OptimizeMediaJob.Factory());
put(OptimizeMessageSearchIndexJob.KEY, new OptimizeMessageSearchIndexJob.Factory());
put(PaymentLedgerUpdateJob.KEY, new PaymentLedgerUpdateJob.Factory());
put(PaymentNotificationSendJob.KEY, new PaymentNotificationSendJob.Factory());
@@ -223,6 +224,7 @@ public final class JobManagerFactories {
put(RestoreAttachmentJob.KEY, new RestoreAttachmentJob.Factory());
put(RestoreAttachmentThumbnailJob.KEY, new RestoreAttachmentThumbnailJob.Factory());
put(RestoreLocalAttachmentJob.KEY, new RestoreLocalAttachmentJob.Factory());
put(RestoreOptimizedMediaJob.KEY, new RestoreOptimizedMediaJob.Factory());
put(RetrieveProfileAvatarJob.KEY, new RetrieveProfileAvatarJob.Factory());
put(RetrieveProfileJob.KEY, new RetrieveProfileJob.Factory());
put(RetrieveRemoteAnnouncementsJob.KEY, new RetrieveRemoteAnnouncementsJob.Factory());

View File

@@ -0,0 +1,71 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.jobs
import org.signal.core.util.logging.Log
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.dependencies.AppDependencies
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.keyvalue.SignalStore
import kotlin.time.Duration.Companion.days
/**
* Optimizes media storage by relying on backups for full copies of files and only keeping thumbnails locally.
*/
class OptimizeMediaJob private constructor(parameters: Parameters) : Job(parameters) {
companion object {
private val TAG = Log.tag(OptimizeMediaJob::class)
const val KEY = "OptimizeMediaJob"
fun enqueue() {
if (!SignalStore.backup.optimizeStorage || !SignalStore.backup.backsUpMedia) {
Log.i(TAG, "Optimize media is not enabled, skipping. backsUpMedia: ${SignalStore.backup.backsUpMedia} optimizeStorage: ${SignalStore.backup.optimizeStorage}")
return
}
AppDependencies.jobManager.add(OptimizeMediaJob())
}
}
constructor() : this(
parameters = Parameters.Builder()
.setQueue("OptimizeMediaJob")
.setMaxInstancesForQueue(2)
.setLifespan(1.days.inWholeMilliseconds)
.setMaxAttempts(3)
.build()
)
override fun run(): Result {
if (!SignalStore.backup.optimizeStorage || !SignalStore.backup.backsUpMedia) {
Log.i(TAG, "Optimize media is not enabled, aborting. backsUpMedia: ${SignalStore.backup.backsUpMedia} optimizeStorage: ${SignalStore.backup.optimizeStorage}")
return Result.success()
}
Log.i(TAG, "Canceling any previous restore optimized media jobs and cleanup progress")
AppDependencies.jobManager.cancelAllInQueue(RestoreAttachmentJob.constructQueueString(RestoreAttachmentJob.RestoreOperation.RESTORE_OFFLOADED))
AppDependencies.jobManager.add(CheckRestoreMediaLeftJob(RestoreAttachmentJob.constructQueueString(RestoreAttachmentJob.RestoreOperation.RESTORE_OFFLOADED)))
Log.i(TAG, "Optimizing media in the db")
SignalDatabase.attachments.markEligibleAttachmentsAsOptimized()
Log.i(TAG, "Deleting abandoned attachment files")
SignalDatabase.attachments.deleteAbandonedAttachmentFiles()
return Result.success()
}
override fun serialize(): ByteArray? = null
override fun getFactoryKey(): String = KEY
override fun onFailure() = Unit
class Factory : Job.Factory<OptimizeMediaJob> {
override fun create(parameters: Parameters, serializedData: ByteArray?): OptimizeMediaJob {
return OptimizeMediaJob(parameters)
}
}
}

View File

@@ -35,6 +35,7 @@ import org.whispersystems.signalservice.api.push.exceptions.RangeException
import java.io.File
import java.io.IOException
import java.util.concurrent.TimeUnit
import kotlin.time.Duration.Companion.milliseconds
/**
* Download attachment from locations as specified in their record.
@@ -43,45 +44,80 @@ class RestoreAttachmentJob private constructor(
parameters: Parameters,
private val messageId: Long,
private val attachmentId: AttachmentId,
private val offloaded: Boolean
private val manual: Boolean
) : BaseJob(parameters) {
companion object {
const val KEY = "RestoreAttachmentJob"
private val TAG = Log.tag(RestoreAttachmentJob::class.java)
@JvmStatic
fun constructQueueString(): String {
// TODO: decide how many queues
return "RestoreAttachmentJob"
/**
* Create a restore job for the initial large batch of media on a fresh restore
*/
fun forInitialRestore(attachmentId: AttachmentId, messageId: Long): RestoreAttachmentJob {
return RestoreAttachmentJob(
attachmentId = attachmentId,
messageId = messageId,
manual = false,
queue = constructQueueString(RestoreOperation.INITIAL_RESTORE)
)
}
/**
* Create a restore job for the large batch of media on a full media restore after disabling optimize media.
*
* See [RestoreOptimizedMediaJob].
*/
fun forOffloadedRestore(attachmentId: AttachmentId, messageId: Long): RestoreAttachmentJob {
return RestoreAttachmentJob(
attachmentId = attachmentId,
messageId = messageId,
manual = false,
queue = constructQueueString(RestoreOperation.RESTORE_OFFLOADED)
)
}
/**
* Restore an attachment when manually triggered by user interaction.
*
* @return job id of the restore
*/
@JvmStatic
fun restoreAttachment(attachment: DatabaseAttachment): String {
val restoreJob = RestoreAttachmentJob(
messageId = attachment.mmsId,
attachmentId = attachment.attachmentId,
offloaded = true
manual = true,
queue = constructQueueString(RestoreOperation.MANUAL)
)
AppDependencies.jobManager.add(restoreJob)
return restoreJob.id
}
/**
* There are three modes of restore and we use separate queues for each to facilitate canceling if necessary.
*/
@JvmStatic
fun constructQueueString(restoreOperation: RestoreOperation): String {
return "RestoreAttachmentJob::${restoreOperation.name}"
}
}
constructor(messageId: Long, attachmentId: AttachmentId, offloaded: Boolean = false) : this(
private constructor(messageId: Long, attachmentId: AttachmentId, manual: Boolean, queue: String) : this(
Parameters.Builder()
.setQueue(constructQueueString())
.setQueue(queue)
.addConstraint(NetworkConstraint.KEY)
.setLifespan(TimeUnit.DAYS.toMillis(1))
.setMaxAttempts(Parameters.UNLIMITED)
.setLifespan(TimeUnit.DAYS.toMillis(30))
.setMaxAttempts(3)
.build(),
messageId,
attachmentId,
offloaded
manual
)
override fun serialize(): ByteArray? {
return RestoreAttachmentJobData(messageId = messageId, attachmentId = attachmentId.id, offloaded = offloaded).encode()
override fun serialize(): ByteArray {
return RestoreAttachmentJobData(messageId = messageId, attachmentId = attachmentId.id, manual = manual).encode()
}
override fun getFactoryKey(): String {
@@ -89,15 +125,7 @@ class RestoreAttachmentJob private constructor(
}
override fun onAdded() {
if (offloaded) {
Log.i(TAG, "onAdded() messageId: $messageId attachmentId: $attachmentId")
val attachment = SignalDatabase.attachments.getAttachment(attachmentId)
if (attachment?.transferState == AttachmentTable.TRANSFER_NEEDS_RESTORE || attachment?.transferState == AttachmentTable.TRANSFER_RESTORE_OFFLOADED) {
Log.i(TAG, "onAdded() Marking attachment restore progress as 'started'")
SignalDatabase.attachments.setTransferState(messageId, attachmentId, AttachmentTable.TRANSFER_RESTORE_IN_PROGRESS)
}
}
SignalDatabase.attachments.setRestoreTransferState(attachmentId, AttachmentTable.TRANSFER_RESTORE_IN_PROGRESS)
}
@Throws(Exception::class)
@@ -137,9 +165,13 @@ class RestoreAttachmentJob private constructor(
}
override fun onFailure() {
Log.w(TAG, format(this, "onFailure() messageId: $messageId attachmentId: $attachmentId"))
if (isCanceled) {
SignalDatabase.attachments.setTransferState(messageId, attachmentId, AttachmentTable.TRANSFER_RESTORE_OFFLOADED)
} else {
Log.w(TAG, format(this, "onFailure() messageId: $messageId attachmentId: $attachmentId"))
markFailed(messageId, attachmentId)
markFailed(messageId, attachmentId)
}
}
override fun onShouldRetry(exception: Exception): Boolean {
@@ -211,7 +243,7 @@ class RestoreAttachmentJob private constructor(
)
}
SignalDatabase.attachments.finalizeAttachmentAfterDownload(messageId, attachmentId, downloadResult.dataStream, downloadResult.iv)
SignalDatabase.attachments.finalizeAttachmentAfterDownload(messageId, attachmentId, downloadResult.dataStream, downloadResult.iv, if (manual) System.currentTimeMillis().milliseconds else null)
} catch (e: RangeException) {
val transferFile = archiveFile ?: attachmentFile
Log.w(TAG, "Range exception, file size " + transferFile.length(), e)
@@ -270,8 +302,12 @@ class RestoreAttachmentJob private constructor(
parameters = parameters,
messageId = data.messageId,
attachmentId = AttachmentId(data.attachmentId),
offloaded = data.offloaded
manual = data.manual
)
}
}
enum class RestoreOperation {
MANUAL, RESTORE_OFFLOADED, INITIAL_RESTORE
}
}

View File

@@ -73,6 +73,10 @@ class RestoreAttachmentThumbnailJob private constructor(
return KEY
}
override fun onAdded() {
SignalDatabase.attachments.setThumbnailRestoreState(attachmentId, AttachmentTable.ThumbnailRestoreState.IN_PROGRESS)
}
@Throws(Exception::class, IOException::class, InvalidAttachmentException::class, InvalidMessageException::class, MissingConfigurationException::class)
public override fun onRun() {
Log.i(TAG, "onRun() messageId: $messageId attachmentId: $attachmentId")

View File

@@ -9,7 +9,6 @@ import org.signal.core.util.Base64
import org.signal.core.util.StreamUtil
import org.signal.core.util.androidx.DocumentFileInfo
import org.signal.core.util.logging.Log
import org.signal.core.util.withinTransaction
import org.signal.libsignal.protocol.InvalidMacException
import org.signal.libsignal.protocol.InvalidMessageException
import org.thoughtcrime.securesms.attachments.AttachmentId
@@ -44,14 +43,12 @@ class RestoreLocalAttachmentJob private constructor(
private const val CONCURRENT_QUEUES = 2
fun enqueueRestoreLocalAttachmentsJobs(mediaNameToFileInfo: Map<String, DocumentFileInfo>) {
var restoreAttachmentJobs: MutableList<Job>
val jobManager = AppDependencies.jobManager
do {
val possibleRestorableAttachments: List<RestorableAttachment> = SignalDatabase.attachments.getRestorableAttachments(500)
val restorableAttachments = ArrayList<RestorableAttachment>(possibleRestorableAttachments.size)
val notRestorableAttachments = ArrayList<RestorableAttachment>(possibleRestorableAttachments.size)
restoreAttachmentJobs = ArrayList(possibleRestorableAttachments.size)
val notRestorableAttachments = ArrayList<AttachmentId>(possibleRestorableAttachments.size)
val restoreAttachmentJobs: MutableList<Job> = ArrayList(possibleRestorableAttachments.size)
possibleRestorableAttachments
.forEachIndexed { index, attachment ->
@@ -63,22 +60,21 @@ class RestoreLocalAttachmentJob private constructor(
}
if (fileInfo != null) {
restorableAttachments += attachment
restoreAttachmentJobs += RestoreLocalAttachmentJob(queueName(index), attachment, fileInfo)
} else {
notRestorableAttachments += attachment
notRestorableAttachments += attachment.attachmentId
}
}
SignalDatabase.rawDatabase.withinTransaction {
SignalDatabase.attachments.setRestoreTransferState(restorableAttachments, AttachmentTable.TRANSFER_RESTORE_IN_PROGRESS)
SignalDatabase.attachments.setRestoreTransferState(notRestorableAttachments, AttachmentTable.TRANSFER_PROGRESS_FAILED)
// Mark not restorable attachments as failed
SignalDatabase.attachments.setRestoreTransferState(notRestorableAttachments, AttachmentTable.TRANSFER_PROGRESS_FAILED)
SignalStore.backup.totalRestorableAttachmentSize = SignalDatabase.attachments.getRemainingRestorableAttachmentSize()
AppDependencies.jobManager.addAll(restoreAttachmentJobs)
}
// Intentionally enqueues one at a time for safer attachment transfer state management
restoreAttachmentJobs.forEach { jobManager.add(it) }
} while (restoreAttachmentJobs.isNotEmpty())
SignalStore.backup.totalRestorableAttachmentSize = SignalDatabase.attachments.getRemainingRestorableAttachmentSize()
val checkDoneJobs = (0 until CONCURRENT_QUEUES)
.map {
CheckRestoreMediaLeftJob(queueName(it))

View File

@@ -0,0 +1,68 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.jobs
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.dependencies.AppDependencies
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.keyvalue.SignalStore
/**
* Restores any media that was previously optimized and off-loaded into the user's archive. Leverages
* the same archive restore progress/flow.
*/
class RestoreOptimizedMediaJob private constructor(parameters: Parameters) : Job(parameters) {
companion object {
const val KEY = "RestoreOptimizeMediaJob"
fun enqueue() {
val job = RestoreOptimizedMediaJob()
AppDependencies.jobManager.add(job)
}
}
private constructor() : this(
parameters = Parameters.Builder()
.setQueue("RestoreOptimizeMediaJob")
.setMaxInstancesForQueue(2)
.setMaxAttempts(3)
.build()
)
override fun run(): Result {
val jobManager = AppDependencies.jobManager
SignalDatabase
.attachments
.getRestorableOptimizedAttachments()
.forEach {
val job = RestoreAttachmentJob.forOffloadedRestore(
messageId = it.mmsId,
attachmentId = it.attachmentId
)
// Intentionally enqueues one at a time for safer attachment transfer state management
jobManager.add(job)
}
SignalStore.backup.totalRestorableAttachmentSize = SignalDatabase.attachments.getRemainingRestorableAttachmentSize()
AppDependencies.jobManager.add(CheckRestoreMediaLeftJob(RestoreAttachmentJob.constructQueueString(RestoreAttachmentJob.RestoreOperation.RESTORE_OFFLOADED)))
return Result.success()
}
override fun serialize(): ByteArray? = null
override fun getFactoryKey(): String = KEY
override fun onFailure() = Unit
class Factory : Job.Factory<RestoreOptimizedMediaJob> {
override fun create(parameters: Parameters, serializedData: ByteArray?): RestoreOptimizedMediaJob {
return RestoreOptimizedMediaJob(parameters)
}
}
}