Refactor and simplify attachment archiving.

This commit is contained in:
Greyson Parrelli
2024-09-12 10:58:48 -04:00
committed by Cody Henthorne
parent 816006c67e
commit e80ebd87fe
23 changed files with 627 additions and 481 deletions

View File

@@ -7,283 +7,57 @@ package org.thoughtcrime.securesms.jobs
import org.greenrobot.eventbus.EventBus
import org.signal.core.util.logging.Log
import org.signal.protos.resumableuploads.ResumableUpload
import org.thoughtcrime.securesms.attachments.AttachmentId
import org.thoughtcrime.securesms.attachments.AttachmentUploadUtil
import org.thoughtcrime.securesms.attachments.DatabaseAttachment
import org.thoughtcrime.securesms.backup.v2.BackupRepository
import org.thoughtcrime.securesms.backup.v2.BackupV2Event
import org.thoughtcrime.securesms.database.AttachmentTable
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.dependencies.AppDependencies
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint
import org.thoughtcrime.securesms.jobs.protos.ArchiveAttachmentBackfillJobData
import org.thoughtcrime.securesms.net.SignalNetwork
import org.whispersystems.signalservice.api.NetworkResult
import org.whispersystems.signalservice.api.archive.ArchiveMediaResponse
import org.whispersystems.signalservice.api.archive.ArchiveMediaUploadFormStatusCodes
import org.whispersystems.signalservice.api.attachment.AttachmentUploadResult
import java.io.IOException
import org.thoughtcrime.securesms.keyvalue.SignalStore
import kotlin.time.Duration.Companion.days
/**
* When run, this will find the next attachment that needs to be uploaded to the archive service and upload it.
* It will enqueue a copy of itself if it thinks there is more work to be done, and that copy will continue the upload process.
*/
class ArchiveAttachmentBackfillJob private constructor(
parameters: Parameters,
private var attachmentId: AttachmentId?,
private var uploadSpec: ResumableUpload?,
private var totalCount: Int?,
private var progress: Int?
) : Job(parameters) {
class ArchiveAttachmentBackfillJob private constructor(parameters: Parameters) : Job(parameters) {
companion object {
private val TAG = Log.tag(ArchiveAttachmentBackfillJob::class.java)
const val KEY = "ArchiveAttachmentBackfillJob"
}
constructor(progress: Int? = null, totalCount: Int? = null) : this(
constructor() : this(
parameters = Parameters.Builder()
.setQueue("ArchiveAttachmentBackfillJob")
.setMaxInstancesForQueue(2)
.setLifespan(30.days.inWholeMilliseconds)
.setMaxAttempts(Parameters.UNLIMITED)
.addConstraint(NetworkConstraint.KEY)
.build(),
attachmentId = null,
uploadSpec = null,
totalCount = totalCount,
progress = progress
.build()
)
override fun serialize(): ByteArray {
return ArchiveAttachmentBackfillJobData(
attachmentId = attachmentId?.id,
uploadSpec = uploadSpec
).encode()
}
override fun serialize(): ByteArray? = null
override fun getFactoryKey(): String = KEY
override fun run(): Result {
EventBus.getDefault().postSticky(BackupV2Event(BackupV2Event.Type.PROGRESS_ATTACHMENTS, progress?.toLong() ?: 0, totalCount?.toLong() ?: 0))
var attachmentRecord: DatabaseAttachment? = if (attachmentId != null) {
Log.i(TAG, "Retrying $attachmentId")
SignalDatabase.attachments.getAttachment(attachmentId!!)
} else {
SignalDatabase.attachments.getNextAttachmentToArchiveAndMarkUploadInProgress()
}
val jobs = SignalDatabase.attachments.getAttachmentsThatNeedArchiveUpload()
.map { attachmentId -> UploadAttachmentToArchiveJob(attachmentId, forBackfill = true) }
if (attachmentRecord == null && attachmentId != null) {
Log.w(TAG, "Attachment $attachmentId was not found! Was likely deleted during the process of archiving. Re-enqueuing job with no ID.")
reenqueueWithIncrementedProgress()
return Result.success()
}
SignalStore.backup.totalAttachmentUploadCount = jobs.size.toLong()
SignalStore.backup.currentAttachmentUploadCount = 0
// TODO [backup] If we ever wanted to allow multiple instances of this job to run in parallel, this would have to be done somewhere else
if (attachmentRecord == null) {
Log.i(TAG, "No more attachments to backfill! Ensuring there's no dangling state.")
EventBus.getDefault().postSticky(BackupV2Event(BackupV2Event.Type.PROGRESS_ATTACHMENTS, count = 0, estimatedTotalCount = jobs.size.toLong()))
val resetCount = SignalDatabase.attachments.resetPendingArchiveBackfills()
if (resetCount > 0) {
Log.w(TAG, "We thought we were done, but $resetCount items were still in progress! Need to run again to retry.")
AppDependencies.jobManager.add(
ArchiveAttachmentBackfillJob(
progress = (totalCount ?: resetCount) - resetCount,
totalCount = totalCount ?: resetCount
)
)
} else {
Log.i(TAG, "All good! Should be done.")
}
EventBus.getDefault().postSticky(BackupV2Event(type = BackupV2Event.Type.FINISHED, count = totalCount?.toLong() ?: 0, estimatedTotalCount = totalCount?.toLong() ?: 0))
return Result.success()
}
Log.i(TAG, "Adding ${jobs.size} jobs to backfill attachments.")
AppDependencies.jobManager.addAll(jobs)
attachmentId = attachmentRecord.attachmentId
val transferState: AttachmentTable.ArchiveTransferState? = SignalDatabase.attachments.getArchiveTransferState(attachmentRecord.attachmentId)
if (transferState == null) {
Log.w(TAG, "Attachment $attachmentId was not found when looking for the transfer state! Was likely just deleted. Re-enqueuing job with no ID.")
reenqueueWithIncrementedProgress()
return Result.success()
}
Log.i(TAG, "Current state: $transferState")
if (transferState == AttachmentTable.ArchiveTransferState.FINISHED) {
Log.i(TAG, "Attachment $attachmentId is already finished. Skipping.")
reenqueueWithIncrementedProgress()
return Result.success()
}
if (transferState == AttachmentTable.ArchiveTransferState.PERMANENT_FAILURE) {
Log.i(TAG, "Attachment $attachmentId is already marked as a permanent failure. Skipping.")
reenqueueWithIncrementedProgress()
return Result.success()
}
if (transferState == AttachmentTable.ArchiveTransferState.ATTACHMENT_TRANSFER_PENDING) {
Log.i(TAG, "Attachment $attachmentId is already marked as pending transfer, meaning it's a send attachment that will be uploaded on it's own. Skipping.")
reenqueueWithIncrementedProgress()
return Result.success()
}
if (transferState == AttachmentTable.ArchiveTransferState.BACKFILL_UPLOAD_IN_PROGRESS) {
if (uploadSpec == null || System.currentTimeMillis() > uploadSpec!!.timeout) {
Log.d(TAG, "Need an upload spec. Fetching...")
val (spec, result) = fetchResumableUploadSpec()
if (result != null) {
return result
}
uploadSpec = spec
} else {
Log.d(TAG, "Already have an upload spec. Continuing...")
}
val attachmentStream = try {
AttachmentUploadUtil.buildSignalServiceAttachmentStream(
context = context,
attachment = attachmentRecord,
uploadSpec = uploadSpec!!,
cancellationSignal = { this.isCanceled }
)
} catch (e: IOException) {
Log.e(TAG, "Failed to get attachment stream for $attachmentId", e)
return Result.retry(defaultBackoff())
}
Log.d(TAG, "Beginning upload...")
val uploadResult: AttachmentUploadResult = when (val result = SignalNetwork.attachments.uploadAttachmentV4(attachmentStream)) {
is NetworkResult.Success -> result.result
is NetworkResult.ApplicationError -> throw result.throwable
is NetworkResult.NetworkError -> return Result.retry(defaultBackoff())
is NetworkResult.StatusCodeError -> return Result.retry(defaultBackoff())
}
Log.d(TAG, "Upload complete!")
SignalDatabase.attachments.finalizeAttachmentAfterUpload(attachmentRecord.attachmentId, uploadResult)
SignalDatabase.attachments.setArchiveTransferState(attachmentRecord.attachmentId, AttachmentTable.ArchiveTransferState.BACKFILL_UPLOADED)
attachmentRecord = SignalDatabase.attachments.getAttachment(attachmentRecord.attachmentId)
}
if (attachmentRecord == null) {
Log.w(TAG, "$attachmentId was not found after uploading! Possibly deleted in a narrow race condition. Re-enqueuing job with no ID.")
reenqueueWithIncrementedProgress()
return Result.success()
}
Log.d(TAG, "Moving attachment to archive...")
return when (val result = BackupRepository.archiveMedia(attachmentRecord)) {
is NetworkResult.Success -> {
Log.d(TAG, "Move complete!")
SignalDatabase.attachments.setArchiveTransferState(attachmentRecord.attachmentId, AttachmentTable.ArchiveTransferState.FINISHED)
ArchiveThumbnailUploadJob.enqueueIfNecessary(attachmentRecord.attachmentId)
reenqueueWithIncrementedProgress()
Result.success()
}
is NetworkResult.ApplicationError -> {
Log.w(TAG, "Failed to archive ${attachmentRecord.attachmentId} due to an application error. Retrying.", result.throwable)
Result.retry(defaultBackoff())
}
is NetworkResult.NetworkError -> {
Log.w(TAG, "Encountered a transient network error. Retrying.")
Result.retry(defaultBackoff())
}
is NetworkResult.StatusCodeError -> {
Log.w(TAG, "Failed request with status code ${result.code} for ${attachmentRecord.attachmentId}")
when (ArchiveMediaResponse.StatusCodes.from(result.code)) {
ArchiveMediaResponse.StatusCodes.BadArguments,
ArchiveMediaResponse.StatusCodes.InvalidPresentationOrSignature,
ArchiveMediaResponse.StatusCodes.InsufficientPermissions,
ArchiveMediaResponse.StatusCodes.RateLimited -> {
Result.retry(defaultBackoff())
}
ArchiveMediaResponse.StatusCodes.NoMediaSpaceRemaining -> {
// TODO [backup] This will end the process right away. We need to integrate this with client-driven retry UX.
Result.failure()
}
ArchiveMediaResponse.StatusCodes.Unknown -> {
Result.retry(defaultBackoff())
}
}
}
}
return Result.success()
}
private fun reenqueueWithIncrementedProgress() {
AppDependencies.jobManager.add(
ArchiveAttachmentBackfillJob(
totalCount = totalCount,
progress = progress?.inc()?.coerceAtMost(totalCount ?: 0)
)
)
}
override fun onFailure() {
attachmentId?.let { id ->
Log.w(TAG, "Failed to archive $id!")
}
}
private fun fetchResumableUploadSpec(): Pair<ResumableUpload?, Result?> {
return when (val spec = BackupRepository.getMediaUploadSpec()) {
is NetworkResult.Success -> {
Log.d(TAG, "Got an upload spec!")
spec.result.toProto() to null
}
is NetworkResult.ApplicationError -> {
Log.w(TAG, "Failed to get an upload spec due to an application error. Retrying.", spec.throwable)
return null to Result.retry(defaultBackoff())
}
is NetworkResult.NetworkError -> {
Log.w(TAG, "Encountered a transient network error. Retrying.")
return null to Result.retry(defaultBackoff())
}
is NetworkResult.StatusCodeError -> {
Log.w(TAG, "Failed request with status code ${spec.code}")
when (ArchiveMediaUploadFormStatusCodes.from(spec.code)) {
ArchiveMediaUploadFormStatusCodes.BadArguments,
ArchiveMediaUploadFormStatusCodes.InvalidPresentationOrSignature,
ArchiveMediaUploadFormStatusCodes.InsufficientPermissions,
ArchiveMediaUploadFormStatusCodes.RateLimited -> {
return null to Result.retry(defaultBackoff())
}
ArchiveMediaUploadFormStatusCodes.Unknown -> {
return null to Result.retry(defaultBackoff())
}
}
}
}
}
override fun onFailure() = Unit
class Factory : Job.Factory<ArchiveAttachmentBackfillJob> {
override fun create(parameters: Parameters, serializedData: ByteArray?): ArchiveAttachmentBackfillJob {
val data = serializedData?.let { ArchiveAttachmentBackfillJobData.ADAPTER.decode(it) }
return ArchiveAttachmentBackfillJob(
parameters = parameters,
attachmentId = data?.attachmentId?.let { AttachmentId(it) },
uploadSpec = data?.uploadSpec,
totalCount = data?.totalCount,
progress = data?.count
)
return ArchiveAttachmentBackfillJob(parameters)
}
}
}

View File

@@ -1,81 +0,0 @@
package org.thoughtcrime.securesms.jobs
import org.signal.core.util.logging.Log
import org.thoughtcrime.securesms.attachments.AttachmentId
import org.thoughtcrime.securesms.backup.v2.BackupRepository
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.dependencies.AppDependencies
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint
import org.thoughtcrime.securesms.jobs.protos.ArchiveAttachmentJobData
import org.thoughtcrime.securesms.keyvalue.SignalStore
import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulResponseCodeException
import java.io.IOException
import java.util.concurrent.TimeUnit
/**
* Copies and re-encrypts attachments from the attachment cdn to the archive cdn.
*
* Job will fail if the attachment isn't available on the attachment cdn, use [AttachmentUploadJob] to upload first if necessary.
*/
class ArchiveAttachmentJob private constructor(private val attachmentId: AttachmentId, parameters: Parameters) : BaseJob(parameters) {
companion object {
private val TAG = Log.tag(ArchiveAttachmentJob::class.java)
const val KEY = "ArchiveAttachmentJob"
fun enqueueIfPossible(attachmentId: AttachmentId) {
if (!SignalStore.backup.backsUpMedia) {
return
}
AppDependencies.jobManager.add(ArchiveAttachmentJob(attachmentId))
}
}
constructor(attachmentId: AttachmentId) : this(
attachmentId = attachmentId,
parameters = Parameters.Builder()
.addConstraint(NetworkConstraint.KEY)
.setLifespan(TimeUnit.DAYS.toMillis(1))
.setMaxAttempts(Parameters.UNLIMITED)
.build()
)
override fun serialize(): ByteArray = ArchiveAttachmentJobData(attachmentId.id).encode()
override fun getFactoryKey(): String = KEY
override fun onRun() {
if (!SignalStore.backup.backsUpMedia) {
Log.w(TAG, "Do not have permission to read/write to archive cdn")
return
}
val attachment = SignalDatabase.attachments.getAttachment(attachmentId)
if (attachment == null) {
Log.w(TAG, "Unable to find attachment to archive: $attachmentId")
return
}
BackupRepository.archiveMedia(attachment).successOrThrow()
ArchiveThumbnailUploadJob.enqueueIfNecessary(attachmentId)
SignalStore.backup.usedBackupMediaSpace += attachment.size
}
override fun onShouldRetry(e: Exception): Boolean {
return e is IOException && e !is NonSuccessfulResponseCodeException
}
override fun onFailure() = Unit
class Factory : Job.Factory<ArchiveAttachmentJob> {
override fun create(parameters: Parameters, serializedData: ByteArray?): ArchiveAttachmentJob {
val jobData = ArchiveAttachmentJobData.ADAPTER.decode(serializedData!!)
return ArchiveAttachmentJob(AttachmentId(jobData.attachmentId), parameters)
}
}
}

View File

@@ -210,16 +210,34 @@ class AttachmentDownloadJob private constructor(
Log.i(TAG, "Downloading push part $attachmentId")
SignalDatabase.attachments.setTransferState(messageId, attachmentId, AttachmentTable.TRANSFER_PROGRESS_STARTED)
when (attachment.cdn) {
Cdn.S3 -> retrieveAttachmentForReleaseChannel(messageId, attachmentId, attachment)
else -> retrieveAttachment(messageId, attachmentId, attachment)
val digestChanged = when (attachment.cdn) {
Cdn.S3 -> {
retrieveAttachmentForReleaseChannel(messageId, attachmentId, attachment)
false
}
else -> {
retrieveAttachment(messageId, attachmentId, attachment)
}
}
if ((attachment.cdn == Cdn.CDN_2 || attachment.cdn == Cdn.CDN_3) &&
attachment.archiveMediaId == null &&
SignalStore.backup.backsUpMedia
) {
AppDependencies.jobManager.add(ArchiveAttachmentJob(attachmentId))
if (SignalStore.backup.backsUpMedia) {
when {
attachment.archiveTransferState == AttachmentTable.ArchiveTransferState.FINISHED -> {
Log.i(TAG, "[$attachmentId] Already archived. Skipping.")
}
digestChanged -> {
Log.i(TAG, "[$attachmentId] Digest for attachment changed after download. Re-uploading to archive.")
AppDependencies.jobManager.add(UploadAttachmentToArchiveJob(attachmentId))
}
attachment.cdn !in CopyAttachmentToArchiveJob.ALLOWED_SOURCE_CDNS -> {
Log.i(TAG, "[$attachmentId] Attachment CDN doesn't support copying to archive. Re-uploading to archive.")
AppDependencies.jobManager.add(UploadAttachmentToArchiveJob(attachmentId))
}
else -> {
Log.i(TAG, "[$attachmentId] Enqueuing job to copy to archive.")
AppDependencies.jobManager.add(CopyAttachmentToArchiveJob(attachmentId))
}
}
}
}
@@ -234,12 +252,15 @@ class AttachmentDownloadJob private constructor(
exception is RetryLaterException
}
/**
* @return True if the digest changed as part of downloading, otherwise false.
*/
@Throws(IOException::class, RetryLaterException::class)
private fun retrieveAttachment(
messageId: Long,
attachmentId: AttachmentId,
attachment: DatabaseAttachment
) {
): Boolean {
val maxReceiveSize: Long = RemoteConfig.maxAttachmentReceiveSizeBytes
val attachmentFile: File = SignalDatabase.attachments.getOrCreateTransferFile(attachmentId)
@@ -269,7 +290,7 @@ class AttachmentDownloadJob private constructor(
progressListener
)
SignalDatabase.attachments.finalizeAttachmentAfterDownload(messageId, attachmentId, downloadResult.dataStream, downloadResult.iv)
return SignalDatabase.attachments.finalizeAttachmentAfterDownload(messageId, attachmentId, downloadResult.dataStream, downloadResult.iv)
} catch (e: RangeException) {
Log.w(TAG, "Range exception, file size " + attachmentFile.length(), e)
if (attachmentFile.delete()) {
@@ -285,7 +306,7 @@ class AttachmentDownloadJob private constructor(
if (SignalStore.backup.backsUpMedia && e.code == 404 && attachment.archiveMediaName?.isNotEmpty() == true) {
Log.i(TAG, "Retrying download from archive CDN")
RestoreAttachmentJob.restoreAttachment(attachment)
return
return false
}
Log.w(TAG, "Experienced exception while trying to download an attachment.", e)
@@ -305,6 +326,8 @@ class AttachmentDownloadJob private constructor(
markFailed(messageId, attachmentId)
}
}
return false
}
@Throws(InvalidAttachmentException::class)

View File

@@ -177,7 +177,17 @@ class AttachmentUploadJob private constructor(
buildAttachmentStream(databaseAttachment, notification, uploadSpec!!).use { localAttachment ->
val uploadResult: AttachmentUploadResult = SignalNetwork.attachments.uploadAttachmentV4(localAttachment).successOrThrow()
SignalDatabase.attachments.finalizeAttachmentAfterUpload(databaseAttachment.attachmentId, uploadResult)
ArchiveThumbnailUploadJob.enqueueIfNecessary(databaseAttachment.attachmentId)
if (SignalStore.backup.backsUpMedia) {
when {
databaseAttachment.archiveTransferState == AttachmentTable.ArchiveTransferState.FINISHED -> {
Log.i(TAG, "[$attachmentId] Already archived. Skipping.")
}
else -> {
Log.i(TAG, "[$attachmentId] Enqueuing job to copy to archive.")
AppDependencies.jobManager.add(CopyAttachmentToArchiveJob(attachmentId))
}
}
}
}
}
} catch (e: StreamResetException) {

View File

@@ -5,10 +5,8 @@
package org.thoughtcrime.securesms.jobs
import android.database.Cursor
import org.greenrobot.eventbus.EventBus
import org.signal.core.util.logging.Log
import org.thoughtcrime.securesms.attachments.DatabaseAttachment
import org.thoughtcrime.securesms.backup.v2.BackupRepository
import org.thoughtcrime.securesms.backup.v2.BackupV2Event
import org.thoughtcrime.securesms.database.SignalDatabase
@@ -21,13 +19,12 @@ import org.thoughtcrime.securesms.providers.BlobProvider
import org.whispersystems.signalservice.api.NetworkResult
import java.io.FileInputStream
import java.io.FileOutputStream
import java.io.IOException
/**
* Job that is responsible for exporting the DB as a backup proto and
* also uploading the resulting proto.
*/
class BackupMessagesJob private constructor(parameters: Parameters) : BaseJob(parameters) {
class BackupMessagesJob private constructor(parameters: Parameters) : Job(parameters) {
companion object {
private val TAG = Log.tag(BackupMessagesJob::class.java)
@@ -66,60 +63,7 @@ class BackupMessagesJob private constructor(parameters: Parameters) : BaseJob(pa
override fun onFailure() = Unit
private fun archiveAttachments(): Boolean {
if (!SignalStore.backup.backsUpMedia) return false
val batchSize = 100
var needToBackfill = 0
var totalCount: Int
var progress = 0
SignalDatabase.attachments.getArchivableAttachments().use { cursor ->
totalCount = cursor.count
while (!cursor.isAfterLast) {
val attachments = cursor.readAttachmentBatch(batchSize)
when (val archiveResult = BackupRepository.archiveMedia(attachments)) {
is NetworkResult.Success -> {
Log.i(TAG, "Archive call successful")
for (notFound in archiveResult.result.sourceNotFoundResponses) {
val attachmentId = archiveResult.result.mediaIdToAttachmentId(notFound.mediaId)
Log.i(TAG, "Attachment $attachmentId not found on cdn, will need to re-upload")
needToBackfill++
}
for (success in archiveResult.result.successfulResponses) {
val attachmentId = archiveResult.result.mediaIdToAttachmentId(success.mediaId)
ArchiveThumbnailUploadJob.enqueueIfNecessary(attachmentId)
}
progress += attachments.size
}
else -> {
Log.e(TAG, "Failed to archive $archiveResult")
}
}
EventBus.getDefault().postSticky(BackupV2Event(BackupV2Event.Type.PROGRESS_ATTACHMENTS, (progress - needToBackfill).toLong(), totalCount.toLong()))
}
}
if (needToBackfill > 0) {
AppDependencies.jobManager.add(ArchiveAttachmentBackfillJob(totalCount = totalCount, progress = progress - needToBackfill))
return true
}
return false
}
private fun Cursor.readAttachmentBatch(batchSize: Int): List<DatabaseAttachment> {
val attachments = ArrayList<DatabaseAttachment>()
for (i in 0 until batchSize) {
if (this.moveToNext()) {
attachments.addAll(SignalDatabase.attachments.getAttachments(this))
} else {
break
}
}
return attachments
}
override fun onRun() {
override fun run(): Result {
EventBus.getDefault().postSticky(BackupV2Event(type = BackupV2Event.Type.PROGRESS_MESSAGES, count = 0, estimatedTotalCount = 0))
val tempBackupFile = BlobProvider.getInstance().forNonAutoEncryptingSingleSessionOnDisk(AppDependencies.application)
@@ -127,28 +71,40 @@ class BackupMessagesJob private constructor(parameters: Parameters) : BaseJob(pa
BackupRepository.export(outputStream = outputStream, append = { tempBackupFile.appendBytes(it) }, plaintext = false)
FileInputStream(tempBackupFile).use {
BackupRepository.uploadBackupFile(it, tempBackupFile.length())
when (val result = BackupRepository.uploadBackupFile(it, tempBackupFile.length())) {
is NetworkResult.Success -> Log.i(TAG, "Successfully uploaded backup file.")
is NetworkResult.NetworkError -> return Result.retry(defaultBackoff())
is NetworkResult.StatusCodeError -> return Result.retry(defaultBackoff())
is NetworkResult.ApplicationError -> throw result.throwable
}
}
val needBackfill = archiveAttachments()
SignalStore.backup.lastBackupProtoSize = tempBackupFile.length()
if (!tempBackupFile.delete()) {
Log.e(TAG, "Failed to delete temp backup file")
}
SignalStore.backup.lastBackupTime = System.currentTimeMillis()
if (!needBackfill) {
EventBus.getDefault().postSticky(BackupV2Event(BackupV2Event.Type.FINISHED, 0, 0))
try {
SignalStore.backup.usedBackupMediaSpace = (BackupRepository.getRemoteBackupUsedSpace().successOrThrow() ?: 0)
} catch (e: IOException) {
Log.e(TAG, "Failed to update used space")
SignalStore.backup.usedBackupMediaSpace = when (val result = BackupRepository.getRemoteBackupUsedSpace()) {
is NetworkResult.Success -> result.result ?: 0
is NetworkResult.NetworkError -> SignalStore.backup.usedBackupMediaSpace // TODO enqueue a secondary job to fetch the latest number -- no need to fail this one
is NetworkResult.StatusCodeError -> {
Log.w(TAG, "Failed to get used space: ${result.code}")
SignalStore.backup.usedBackupMediaSpace
}
is NetworkResult.ApplicationError -> throw result.throwable
}
}
override fun onShouldRetry(e: Exception): Boolean = false
if (SignalDatabase.attachments.doAnyAttachmentsNeedArchiveUpload()) {
Log.i(TAG, "Enqueuing attachment backfill job.")
AppDependencies.jobManager.add(ArchiveAttachmentBackfillJob())
} else {
Log.i(TAG, "No attachments need to be uploaded, we can finish.")
EventBus.getDefault().postSticky(BackupV2Event(BackupV2Event.Type.FINISHED, 0, 0))
}
return Result.success()
}
class Factory : Job.Factory<BackupMessagesJob> {
override fun create(parameters: Parameters, serializedData: ByteArray?): BackupMessagesJob {

View File

@@ -0,0 +1,180 @@
package org.thoughtcrime.securesms.jobs
import org.greenrobot.eventbus.EventBus
import org.signal.core.util.logging.Log
import org.thoughtcrime.securesms.attachments.AttachmentId
import org.thoughtcrime.securesms.attachments.Cdn
import org.thoughtcrime.securesms.attachments.DatabaseAttachment
import org.thoughtcrime.securesms.backup.v2.BackupRepository
import org.thoughtcrime.securesms.backup.v2.BackupV2Event
import org.thoughtcrime.securesms.database.AttachmentTable
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.dependencies.AppDependencies
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint
import org.thoughtcrime.securesms.jobs.protos.CopyAttachmentToArchiveJobData
import org.thoughtcrime.securesms.keyvalue.SignalStore
import org.whispersystems.signalservice.api.NetworkResult
import org.whispersystems.signalservice.api.crypto.AttachmentCipherStreamUtil
import org.whispersystems.signalservice.internal.crypto.PaddingInputStream
import java.lang.RuntimeException
import java.util.concurrent.TimeUnit
/**
* Copies and re-encrypts attachments from the attachment cdn to the archive cdn.
* If it's discovered that the attachment no longer exists on the attachment cdn, this job will schedule a re-upload via [UploadAttachmentToArchiveJob].
*/
class CopyAttachmentToArchiveJob private constructor(private val attachmentId: AttachmentId, private val forBackfill: Boolean, parameters: Parameters) : Job(parameters) {
companion object {
private val TAG = Log.tag(CopyAttachmentToArchiveJob::class.java)
const val KEY = "CopyAttachmentToArchiveJob"
/** CDNs that we can copy data from */
val ALLOWED_SOURCE_CDNS = setOf(Cdn.CDN_2, Cdn.CDN_3)
}
constructor(attachmentId: AttachmentId, forBackfill: Boolean = false) : this(
attachmentId = attachmentId,
forBackfill = forBackfill,
parameters = Parameters.Builder()
.addConstraint(NetworkConstraint.KEY)
.setLifespan(TimeUnit.DAYS.toMillis(1))
.setMaxAttempts(Parameters.UNLIMITED)
.setQueue(UploadAttachmentToArchiveJob.buildQueueKey(attachmentId))
.build()
)
override fun serialize(): ByteArray = CopyAttachmentToArchiveJobData(
attachmentId = attachmentId.id,
forBackfill = forBackfill
).encode()
override fun getFactoryKey(): String = KEY
override fun onAdded() {
val transferStatus = SignalDatabase.attachments.getArchiveTransferState(attachmentId) ?: return
if (transferStatus == AttachmentTable.ArchiveTransferState.NONE || transferStatus == AttachmentTable.ArchiveTransferState.UPLOAD_IN_PROGRESS) {
Log.d(TAG, "[$attachmentId] Updating archive transfer state to ${AttachmentTable.ArchiveTransferState.COPY_PENDING}")
SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.COPY_PENDING)
}
}
override fun run(): Result {
if (!SignalStore.backup.backsUpMedia) {
Log.w(TAG, "[$attachmentId] This user does not back up media. Skipping.")
return Result.success()
}
val attachment: DatabaseAttachment? = SignalDatabase.attachments.getAttachment(attachmentId)
if (attachment == null) {
Log.w(TAG, "[$attachmentId] Attachment no longer exists! Skipping.")
return Result.failure()
}
if (attachment.archiveTransferState == AttachmentTable.ArchiveTransferState.FINISHED) {
Log.i(TAG, "[$attachmentId] Already finished. Skipping.")
return Result.success()
}
if (attachment.archiveTransferState == AttachmentTable.ArchiveTransferState.PERMANENT_FAILURE) {
Log.i(TAG, "[$attachmentId] Already marked as a permanent failure. Skipping.")
return Result.failure()
}
if (attachment.archiveTransferState == AttachmentTable.ArchiveTransferState.NONE) {
Log.i(TAG, "[$attachmentId] Not marked as pending copy. Enqueueing an upload job instead.")
AppDependencies.jobManager.add(UploadAttachmentToArchiveJob(attachmentId))
return Result.success()
}
val result = when (val archiveResult = BackupRepository.archiveMedia(attachment)) {
is NetworkResult.Success -> {
Log.i(TAG, "[$attachmentId] Successfully copied the archive tier.")
Result.success()
}
is NetworkResult.NetworkError -> {
Log.w(TAG, "[$attachmentId] Encountered a retryable network error.", archiveResult.exception)
Result.retry(defaultBackoff())
}
is NetworkResult.StatusCodeError -> {
when (archiveResult.code) {
403 -> {
// TODO [backup] What is the best way to handle this UX-wise?
Log.w(TAG, "[$attachmentId] Insufficient permissions to upload. Is the user no longer on media tier?")
Result.success()
}
410 -> {
Log.w(TAG, "[$attachmentId] The attachment no longer exists on the transit tier. Scheduling a re-upload.")
SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
AppDependencies.jobManager.add(UploadAttachmentToArchiveJob(attachmentId))
Result.success()
}
413 -> {
// TODO [backup] What is the best way to handle this UX-wise?
Log.w(TAG, "[$attachmentId] Insufficient storage space! Can't upload!")
Result.success()
}
else -> {
Log.w(TAG, "[$attachmentId] Got back a non-2xx status code: ${archiveResult.code}. Retrying.")
Result.retry(defaultBackoff())
}
}
}
is NetworkResult.ApplicationError -> {
Log.w(TAG, "[$attachmentId] Encountered a fatal error when trying to upload!")
Result.fatalFailure(RuntimeException(archiveResult.throwable))
}
}
if (result.isSuccess) {
SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.FINISHED)
ArchiveThumbnailUploadJob.enqueueIfNecessary(attachmentId)
SignalStore.backup.usedBackupMediaSpace += AttachmentCipherStreamUtil.getCiphertextLength(PaddingInputStream.getPaddedSize(attachment.size))
incrementBackfillProgressIfNecessary()
}
return result
}
override fun onFailure() {
incrementBackfillProgressIfNecessary()
}
private fun incrementBackfillProgressIfNecessary() {
if (!forBackfill) {
return
}
if (SignalStore.backup.totalAttachmentUploadCount > 0) {
SignalStore.backup.currentAttachmentUploadCount++
if (SignalStore.backup.currentAttachmentUploadCount >= SignalStore.backup.totalAttachmentUploadCount) {
EventBus.getDefault().postSticky(BackupV2Event(BackupV2Event.Type.FINISHED, count = 0, estimatedTotalCount = 0))
SignalStore.backup.currentAttachmentUploadCount = 0
SignalStore.backup.totalAttachmentUploadCount = 0
} else {
EventBus.getDefault().postSticky(BackupV2Event(BackupV2Event.Type.PROGRESS_ATTACHMENTS, count = SignalStore.backup.currentAttachmentUploadCount, estimatedTotalCount = SignalStore.backup.totalAttachmentUploadCount))
}
}
}
class Factory : Job.Factory<CopyAttachmentToArchiveJob> {
override fun create(parameters: Parameters, serializedData: ByteArray?): CopyAttachmentToArchiveJob {
val jobData = CopyAttachmentToArchiveJobData.ADAPTER.decode(serializedData!!)
return CopyAttachmentToArchiveJob(
attachmentId = AttachmentId(jobData.attachmentId),
forBackfill = jobData.forBackfill,
parameters = parameters
)
}
}
}

View File

@@ -106,7 +106,7 @@ public final class JobManagerFactories {
return new HashMap<String, Job.Factory>() {{
put(AccountConsistencyWorkerJob.KEY, new AccountConsistencyWorkerJob.Factory());
put(AnalyzeDatabaseJob.KEY, new AnalyzeDatabaseJob.Factory());
put(ArchiveAttachmentJob.KEY, new ArchiveAttachmentJob.Factory());
put(ApkUpdateJob.KEY, new ApkUpdateJob.Factory());
put(ArchiveAttachmentBackfillJob.KEY, new ArchiveAttachmentBackfillJob.Factory());
put(ArchiveThumbnailUploadJob.KEY, new ArchiveThumbnailUploadJob.Factory());
put(AttachmentCompressionJob.KEY, new AttachmentCompressionJob.Factory());
@@ -134,6 +134,7 @@ public final class JobManagerFactories {
put(ContactLinkRebuildMigrationJob.KEY, new ContactLinkRebuildMigrationJob.Factory());
put(ConversationShortcutRankingUpdateJob.KEY, new ConversationShortcutRankingUpdateJob.Factory());
put(ConversationShortcutUpdateJob.KEY, new ConversationShortcutUpdateJob.Factory());
put(CopyAttachmentToArchiveJob.KEY, new CopyAttachmentToArchiveJob.Factory());
put(CreateReleaseChannelJob.KEY, new CreateReleaseChannelJob.Factory());
put(DirectoryRefreshJob.KEY, new DirectoryRefreshJob.Factory());
put(DonationReceiptRedemptionJob.KEY, new DonationReceiptRedemptionJob.Factory());
@@ -251,7 +252,7 @@ public final class JobManagerFactories {
put(ThreadUpdateJob.KEY, new ThreadUpdateJob.Factory());
put(TrimThreadJob.KEY, new TrimThreadJob.Factory());
put(TypingSendJob.KEY, new TypingSendJob.Factory());
put(ApkUpdateJob.KEY, new ApkUpdateJob.Factory());
put(UploadAttachmentToArchiveJob.KEY, new UploadAttachmentToArchiveJob.Factory());
// Migrations
put(AccountConsistencyMigrationJob.KEY, new AccountConsistencyMigrationJob.Factory());

View File

@@ -0,0 +1,204 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.jobs
import org.signal.core.util.logging.Log
import org.signal.protos.resumableuploads.ResumableUpload
import org.thoughtcrime.securesms.attachments.AttachmentId
import org.thoughtcrime.securesms.attachments.AttachmentUploadUtil
import org.thoughtcrime.securesms.attachments.DatabaseAttachment
import org.thoughtcrime.securesms.backup.v2.BackupRepository
import org.thoughtcrime.securesms.database.AttachmentTable
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.dependencies.AppDependencies
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint
import org.thoughtcrime.securesms.jobs.protos.UploadAttachmentToArchiveJobData
import org.thoughtcrime.securesms.keyvalue.SignalStore
import org.thoughtcrime.securesms.net.SignalNetwork
import org.whispersystems.signalservice.api.NetworkResult
import org.whispersystems.signalservice.api.archive.ArchiveMediaUploadFormStatusCodes
import org.whispersystems.signalservice.api.attachment.AttachmentUploadResult
import java.io.IOException
import kotlin.time.Duration.Companion.days
/**
* Given an attachmentId, this will upload the corresponding attachment to the archive cdn.
* To do this, it must first upload it to the attachment cdn, and then copy it to the archive cdn.
*/
class UploadAttachmentToArchiveJob private constructor(
private val attachmentId: AttachmentId,
private var uploadSpec: ResumableUpload?,
private val forBackfill: Boolean,
parameters: Parameters
) : Job(parameters) {
companion object {
private val TAG = Log.tag(UploadAttachmentToArchiveJob::class)
const val KEY = "UploadAttachmentToArchiveJob"
fun buildQueueKey(attachmentId: AttachmentId) = "ArchiveAttachmentJobs_${attachmentId.id}"
}
constructor(attachmentId: AttachmentId, forBackfill: Boolean = false) : this(
attachmentId = attachmentId,
uploadSpec = null,
forBackfill = forBackfill,
parameters = Parameters.Builder()
.addConstraint(NetworkConstraint.KEY)
.setLifespan(30.days.inWholeMilliseconds)
.setMaxAttempts(Parameters.UNLIMITED)
.setQueue(buildQueueKey(attachmentId))
.build()
)
override fun serialize(): ByteArray = UploadAttachmentToArchiveJobData(
attachmentId = attachmentId.id,
forBackfill = forBackfill
).encode()
override fun getFactoryKey(): String = KEY
override fun onAdded() {
val transferStatus = SignalDatabase.attachments.getArchiveTransferState(attachmentId) ?: return
if (transferStatus == AttachmentTable.ArchiveTransferState.NONE) {
Log.d(TAG, "[$attachmentId] Updating archive transfer state to ${AttachmentTable.ArchiveTransferState.UPLOAD_IN_PROGRESS}")
SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.UPLOAD_IN_PROGRESS)
}
}
override fun run(): Result {
if (!SignalStore.backup.backsUpMedia) {
Log.w(TAG, "[$attachmentId] This user does not back up media. Skipping.")
return Result.success()
}
val attachment: DatabaseAttachment? = SignalDatabase.attachments.getAttachment(attachmentId)
if (attachment == null) {
Log.w(TAG, "[$attachmentId] Attachment no longer exists! Skipping.")
return Result.failure()
}
if (attachment.archiveTransferState == AttachmentTable.ArchiveTransferState.FINISHED) {
Log.i(TAG, "[$attachmentId] Already finished. Skipping.")
return Result.success()
}
if (attachment.archiveTransferState == AttachmentTable.ArchiveTransferState.PERMANENT_FAILURE) {
Log.i(TAG, "[$attachmentId] Already marked as a permanent failure. Skipping.")
return Result.failure()
}
if (attachment.archiveTransferState == AttachmentTable.ArchiveTransferState.COPY_PENDING) {
Log.i(TAG, "[$attachmentId] Already marked as pending transfer. Enqueueing a copy job just in case.")
AppDependencies.jobManager.add(CopyAttachmentToArchiveJob(attachment.attachmentId, forBackfill = forBackfill))
return Result.success()
}
if (uploadSpec != null && System.currentTimeMillis() > uploadSpec!!.timeout) {
Log.w(TAG, "[$attachmentId] Upload spec expired! Clearing.")
uploadSpec = null
}
if (uploadSpec == null) {
Log.d(TAG, "[$attachmentId] Need an upload spec. Fetching...")
val (spec, result) = fetchResumableUploadSpec()
if (result != null) {
return result
}
uploadSpec = spec
} else {
Log.d(TAG, "[$attachmentId] Already have an upload spec. Continuing...")
}
val attachmentStream = try {
AttachmentUploadUtil.buildSignalServiceAttachmentStream(
context = context,
attachment = attachment,
uploadSpec = uploadSpec!!,
cancellationSignal = { this.isCanceled }
)
} catch (e: IOException) {
Log.e(TAG, "[$attachmentId] Failed to get attachment stream.", e)
return Result.retry(defaultBackoff())
}
Log.d(TAG, "[$attachmentId] Beginning upload...")
val uploadResult: AttachmentUploadResult = when (val result = SignalNetwork.attachments.uploadAttachmentV4(attachmentStream)) {
is NetworkResult.Success -> result.result
is NetworkResult.ApplicationError -> throw result.throwable
is NetworkResult.NetworkError -> {
Log.w(TAG, "[$attachmentId] Failed to upload due to network error.", result.exception)
return Result.retry(defaultBackoff())
}
is NetworkResult.StatusCodeError -> {
Log.w(TAG, "[$attachmentId] Failed to upload due to status code error. Code: ${result.code}", result.exception)
return Result.retry(defaultBackoff())
}
}
Log.d(TAG, "[$attachmentId] Upload complete!")
SignalDatabase.attachments.finalizeAttachmentAfterUpload(attachment.attachmentId, uploadResult)
AppDependencies.jobManager.add(CopyAttachmentToArchiveJob(attachment.attachmentId, forBackfill = forBackfill))
return Result.success()
}
override fun onFailure() = Unit
private fun fetchResumableUploadSpec(): Pair<ResumableUpload?, Result?> {
return when (val spec = BackupRepository.getMediaUploadSpec()) {
is NetworkResult.Success -> {
Log.d(TAG, "[$attachmentId] Got an upload spec!")
spec.result.toProto() to null
}
is NetworkResult.ApplicationError -> {
Log.w(TAG, "[$attachmentId] Failed to get an upload spec due to an application error. Retrying.", spec.throwable)
return null to Result.retry(defaultBackoff())
}
is NetworkResult.NetworkError -> {
Log.w(TAG, "[$attachmentId] Encountered a transient network error. Retrying.")
return null to Result.retry(defaultBackoff())
}
is NetworkResult.StatusCodeError -> {
Log.w(TAG, "[$attachmentId] Failed request with status code ${spec.code}")
when (ArchiveMediaUploadFormStatusCodes.from(spec.code)) {
ArchiveMediaUploadFormStatusCodes.BadArguments,
ArchiveMediaUploadFormStatusCodes.InvalidPresentationOrSignature,
ArchiveMediaUploadFormStatusCodes.InsufficientPermissions,
ArchiveMediaUploadFormStatusCodes.RateLimited -> {
return null to Result.retry(defaultBackoff())
}
ArchiveMediaUploadFormStatusCodes.Unknown -> {
return null to Result.retry(defaultBackoff())
}
}
}
}
}
class Factory : Job.Factory<UploadAttachmentToArchiveJob> {
override fun create(parameters: Parameters, serializedData: ByteArray?): UploadAttachmentToArchiveJob {
val data = UploadAttachmentToArchiveJobData.ADAPTER.decode(serializedData!!)
return UploadAttachmentToArchiveJob(
attachmentId = AttachmentId(data.attachmentId),
uploadSpec = data.uploadSpec,
forBackfill = data.forBackfill,
parameters = parameters
)
}
}
}