From e80ebd87fe0faa53a137042dd747f25ec382c564 Mon Sep 17 00:00:00 2001 From: Greyson Parrelli Date: Thu, 12 Sep 2024 10:58:48 -0400 Subject: [PATCH] Refactor and simplify attachment archiving. --- ...ageProcessorTest_synchronizeDeleteForMe.kt | 1 + .../attachments/DatabaseAttachment.kt | 7 + .../securesms/backup/v2/BackupRepository.kt | 3 +- .../InternalBackupPlaygroundViewModel.kt | 6 +- .../securesms/database/AttachmentTable.kt | 81 +++--- .../securesms/database/MediaTable.kt | 1 + .../securesms/database/MessageTable.kt | 1 + .../jobs/ArchiveAttachmentBackfillJob.kt | 256 +----------------- .../securesms/jobs/ArchiveAttachmentJob.kt | 81 ------ .../securesms/jobs/AttachmentDownloadJob.kt | 45 ++- .../securesms/jobs/AttachmentUploadJob.kt | 12 +- .../securesms/jobs/BackupMessagesJob.kt | 94 ++----- .../jobs/CopyAttachmentToArchiveJob.kt | 180 ++++++++++++ .../securesms/jobs/JobManagerFactories.java | 5 +- .../jobs/UploadAttachmentToArchiveJob.kt | 204 ++++++++++++++ .../securesms/keyvalue/BackupValues.kt | 13 + app/src/main/protowire/JobData.proto | 22 +- .../database/AttachmentTransformer.kt | 25 +- .../sms/UploadDependencyGraphTest.kt | 1 + .../securesms/database/FakeMessageRecords.kt | 4 +- .../signalservice/api/archive/ArchiveApi.kt | 7 + .../api/crypto/AttachmentCipherStreamUtil.kt | 26 +- .../crypto/AttachmentCipherStreamUtilTest.kt | 33 +++ 23 files changed, 627 insertions(+), 481 deletions(-) delete mode 100644 app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveAttachmentJob.kt create mode 100644 app/src/main/java/org/thoughtcrime/securesms/jobs/CopyAttachmentToArchiveJob.kt create mode 100644 app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt create mode 100644 libsignal-service/src/test/java/org/whispersystems/signalservice/api/crypto/AttachmentCipherStreamUtilTest.kt diff --git a/app/src/androidTest/java/org/thoughtcrime/securesms/messages/SyncMessageProcessorTest_synchronizeDeleteForMe.kt b/app/src/androidTest/java/org/thoughtcrime/securesms/messages/SyncMessageProcessorTest_synchronizeDeleteForMe.kt index acaf56c94c..30c02e80df 100644 --- a/app/src/androidTest/java/org/thoughtcrime/securesms/messages/SyncMessageProcessorTest_synchronizeDeleteForMe.kt +++ b/app/src/androidTest/java/org/thoughtcrime/securesms/messages/SyncMessageProcessorTest_synchronizeDeleteForMe.kt @@ -706,6 +706,7 @@ class SyncMessageProcessorTest_synchronizeDeleteForMe { archiveMediaName = this.archiveMediaName, archiveMediaId = this.archiveMediaId, thumbnailRestoreState = this.thumbnailRestoreState, + archiveTransferState = this.archiveTransferState, uuid = uuid ) } diff --git a/app/src/main/java/org/thoughtcrime/securesms/attachments/DatabaseAttachment.kt b/app/src/main/java/org/thoughtcrime/securesms/attachments/DatabaseAttachment.kt index 246b784399..bb5c6f2f0f 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/attachments/DatabaseAttachment.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/attachments/DatabaseAttachment.kt @@ -38,6 +38,9 @@ class DatabaseAttachment : Attachment { @JvmField val thumbnailRestoreState: AttachmentTable.ThumbnailRestoreState + @JvmField + val archiveTransferState: AttachmentTable.ArchiveTransferState + private val hasArchiveThumbnail: Boolean private val hasThumbnail: Boolean val displayOrder: Int @@ -78,6 +81,7 @@ class DatabaseAttachment : Attachment { archiveMediaName: String?, archiveMediaId: String?, thumbnailRestoreState: AttachmentTable.ThumbnailRestoreState, + archiveTransferState: AttachmentTable.ArchiveTransferState, uuid: UUID? ) : super( contentType = contentType, @@ -116,6 +120,7 @@ class DatabaseAttachment : Attachment { this.archiveMediaName = archiveMediaName this.archiveMediaId = archiveMediaId this.thumbnailRestoreState = thumbnailRestoreState + this.archiveTransferState = archiveTransferState } constructor(parcel: Parcel) : super(parcel) { @@ -130,6 +135,7 @@ class DatabaseAttachment : Attachment { archiveMediaId = parcel.readString() hasArchiveThumbnail = ParcelUtil.readBoolean(parcel) thumbnailRestoreState = AttachmentTable.ThumbnailRestoreState.deserialize(parcel.readInt()) + archiveTransferState = AttachmentTable.ArchiveTransferState.deserialize(parcel.readInt()) } override fun writeToParcel(dest: Parcel, flags: Int) { @@ -145,6 +151,7 @@ class DatabaseAttachment : Attachment { dest.writeString(archiveMediaId) ParcelUtil.writeBoolean(dest, hasArchiveThumbnail) dest.writeInt(thumbnailRestoreState.value) + dest.writeInt(archiveTransferState.value) } override val uri: Uri? diff --git a/app/src/main/java/org/thoughtcrime/securesms/backup/v2/BackupRepository.kt b/app/src/main/java/org/thoughtcrime/securesms/backup/v2/BackupRepository.kt index ad2c84bcae..639a03b240 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/backup/v2/BackupRepository.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/backup/v2/BackupRepository.kt @@ -541,7 +541,7 @@ object BackupRepository { * * @return True if successful, otherwise false. */ - fun uploadBackupFile(backupStream: InputStream, backupStreamLength: Long): Boolean { + fun uploadBackupFile(backupStream: InputStream, backupStreamLength: Long): NetworkResult { val backupKey = SignalStore.svr.getOrCreateMasterKey().deriveBackupKey() return initBackupAndFetchAuth(backupKey) @@ -559,7 +559,6 @@ object BackupRepository { SignalNetwork.archive.uploadBackupFile(form, resumableUploadUrl, backupStream, backupStreamLength) .also { Log.i(TAG, "UploadBackupFileResult: $it") } } - .also { Log.i(TAG, "OverallResult: $it") } is NetworkResult.Success } fun downloadBackupFile(destination: File, listener: ProgressListener? = null): Boolean { diff --git a/app/src/main/java/org/thoughtcrime/securesms/components/settings/app/internal/backup/InternalBackupPlaygroundViewModel.kt b/app/src/main/java/org/thoughtcrime/securesms/components/settings/app/internal/backup/InternalBackupPlaygroundViewModel.kt index 49dc564d95..320b4fcaa4 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/components/settings/app/internal/backup/InternalBackupPlaygroundViewModel.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/components/settings/app/internal/backup/InternalBackupPlaygroundViewModel.kt @@ -31,11 +31,11 @@ import org.thoughtcrime.securesms.backup.v2.local.SnapshotFileSystem import org.thoughtcrime.securesms.database.MessageType import org.thoughtcrime.securesms.database.SignalDatabase import org.thoughtcrime.securesms.dependencies.AppDependencies -import org.thoughtcrime.securesms.jobs.ArchiveAttachmentJob import org.thoughtcrime.securesms.jobs.AttachmentUploadJob import org.thoughtcrime.securesms.jobs.BackupMessagesJob import org.thoughtcrime.securesms.jobs.BackupRestoreJob import org.thoughtcrime.securesms.jobs.BackupRestoreMediaJob +import org.thoughtcrime.securesms.jobs.CopyAttachmentToArchiveJob import org.thoughtcrime.securesms.jobs.RestoreAttachmentJob import org.thoughtcrime.securesms.jobs.RestoreAttachmentThumbnailJob import org.thoughtcrime.securesms.jobs.RestoreLocalAttachmentJob @@ -175,7 +175,7 @@ class InternalBackupPlaygroundViewModel : ViewModel() { _state.value = _state.value.copy(uploadState = BackupUploadState.UPLOAD_IN_PROGRESS) disposables += Single - .fromCallable { BackupRepository.uploadBackupFile(backupData!!.inputStream(), backupData!!.size.toLong()) } + .fromCallable { BackupRepository.uploadBackupFile(backupData!!.inputStream(), backupData!!.size.toLong()) is NetworkResult.Success } .subscribeOn(Schedulers.io()) .subscribe { success -> _state.value = _state.value.copy(uploadState = if (success) BackupUploadState.UPLOAD_DONE else BackupUploadState.UPLOAD_FAILED) @@ -295,7 +295,7 @@ class InternalBackupPlaygroundViewModel : ViewModel() { AppDependencies .jobManager .startChain(AttachmentUploadJob(attachmentId)) - .then(ArchiveAttachmentJob(attachmentId)) + .then(CopyAttachmentToArchiveJob(attachmentId)) .enqueueAndBlockUntilCompletion(15.seconds.inWholeMilliseconds) } .subscribeOn(Schedulers.io()) diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt b/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt index ce14547910..c9b706f4f0 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt @@ -224,6 +224,7 @@ class AttachmentTable( ARCHIVE_TRANSFER_FILE, THUMBNAIL_FILE, THUMBNAIL_RESTORE_STATE, + ARCHIVE_TRANSFER_STATE, ATTACHMENT_UUID ) @@ -526,30 +527,26 @@ class AttachmentTable( } /** - * Finds the next eligible attachment that needs to be uploaded to the archive service. - * If it exists, it'll also atomically be marked as [ArchiveTransferState.BACKFILL_UPLOAD_IN_PROGRESS]. + * Finds all of the attachmentIds of attachments that need to be uploaded to the archive cdn. */ - fun getNextAttachmentToArchiveAndMarkUploadInProgress(): DatabaseAttachment? { - return writableDatabase.withinTransaction { - val record: DatabaseAttachment? = readableDatabase - .select(*PROJECTION) - .from(TABLE_NAME) - .where("$ARCHIVE_TRANSFER_STATE = ? AND $DATA_FILE NOT NULL AND $TRANSFER_STATE = $TRANSFER_PROGRESS_DONE", ArchiveTransferState.NONE.value) - .orderBy("$ID DESC") - .limit(1) - .run() - .readToSingleObject { it.readAttachment() } + fun getAttachmentsThatNeedArchiveUpload(): List { + return readableDatabase + .select(ID) + .from(TABLE_NAME) + .where("$ARCHIVE_TRANSFER_STATE = ? AND $DATA_FILE NOT NULL AND $TRANSFER_STATE = $TRANSFER_PROGRESS_DONE", ArchiveTransferState.NONE.value) + .orderBy("$ID DESC") + .run() + .readToList { AttachmentId(it.requireLong(ID)) } + } - if (record != null) { - writableDatabase - .update(TABLE_NAME) - .values(ARCHIVE_TRANSFER_STATE to ArchiveTransferState.BACKFILL_UPLOAD_IN_PROGRESS.value) - .where("$ID = ?", record.attachmentId) - .run() - } - - record - } + /** + * Similar to [getAttachmentsThatNeedArchiveUpload], but returns if the list would be non-null in a more efficient way. + */ + fun doAnyAttachmentsNeedArchiveUpload(): Boolean { + return readableDatabase + .exists(TABLE_NAME) + .where("$ARCHIVE_TRANSFER_STATE = ? AND $DATA_FILE NOT NULL AND $TRANSFER_STATE = $TRANSFER_PROGRESS_DONE", ArchiveTransferState.NONE.value) + .run() } /** @@ -584,19 +581,6 @@ class AttachmentTable( } } - /** - * Resets any in-progress archive backfill states to [ArchiveTransferState.NONE], returning the number that had to be reset. - * This should only be called if you believe the backfill process has finished. In this case, if this returns a value > 0, - * it indicates that state was mis-tracked and you should try uploading again. - */ - fun resetPendingArchiveBackfills(): Int { - return writableDatabase - .update(TABLE_NAME) - .values(ARCHIVE_TRANSFER_STATE to ArchiveTransferState.NONE.value) - .where("$ARCHIVE_TRANSFER_STATE == ${ArchiveTransferState.BACKFILL_UPLOAD_IN_PROGRESS.value} || $ARCHIVE_TRANSFER_STATE == ${ArchiveTransferState.BACKFILL_UPLOADED.value}") - .run() - } - fun deleteAttachmentsForMessage(mmsId: Long): Boolean { Log.d(TAG, "[deleteAttachmentsForMessage] mmsId: $mmsId") @@ -940,9 +924,11 @@ class AttachmentTable( * When we find out about a new inbound attachment pointer, we insert a row for it that contains all the info we need to download it via [insertAttachmentWithData]. * Later, we download the data for that pointer. Call this method once you have the data to associate it with the attachment. At this point, it is assumed * that the content of the attachment will never change. + * + * @return True if we had to change the digest as part of saving the file, otherwise false. */ @Throws(MmsException::class) - fun finalizeAttachmentAfterDownload(mmsId: Long, attachmentId: AttachmentId, inputStream: LimitedInputStream, iv: ByteArray?) { + fun finalizeAttachmentAfterDownload(mmsId: Long, attachmentId: AttachmentId, inputStream: LimitedInputStream, iv: ByteArray?): Boolean { Log.i(TAG, "[finalizeAttachmentAfterDownload] Finalizing downloaded data for $attachmentId. (MessageId: $mmsId, $attachmentId)") val existingPlaceholder: DatabaseAttachment = getAttachment(attachmentId) ?: throw MmsException("No attachment found for id: $attachmentId") @@ -969,6 +955,8 @@ class AttachmentTable( cipherOutputStream.transmittedDigest } + val digestChanged = !digest.contentEquals(existingPlaceholder.remoteDigest) + val foundDuplicate = writableDatabase.withinTransaction { db -> // We can look and see if we have any exact matches on hash_ends and dedupe the file if we see one. // We don't look at hash_start here because that could result in us matching on a file that got compressed down to something smaller, effectively lowering @@ -1048,6 +1036,8 @@ class AttachmentTable( if (MediaUtil.isAudio(existingPlaceholder)) { GenerateAudioWaveFormJob.enqueue(existingPlaceholder.attachmentId) } + + return digestChanged } @Throws(IOException::class) @@ -1673,6 +1663,7 @@ class AttachmentTable( archiveMediaId = jsonObject.getString(ARCHIVE_MEDIA_ID), hasArchiveThumbnail = !TextUtils.isEmpty(jsonObject.getString(THUMBNAIL_FILE)), thumbnailRestoreState = ThumbnailRestoreState.deserialize(jsonObject.getInt(THUMBNAIL_RESTORE_STATE)), + archiveTransferState = ArchiveTransferState.deserialize(jsonObject.getInt(ARCHIVE_TRANSFER_STATE)), uuid = UuidUtil.parseOrNull(jsonObject.getString(ATTACHMENT_UUID)) ) } @@ -2273,6 +2264,7 @@ class AttachmentTable( archiveMediaId = cursor.requireString(ARCHIVE_MEDIA_ID), hasArchiveThumbnail = !cursor.isNull(THUMBNAIL_FILE), thumbnailRestoreState = ThumbnailRestoreState.deserialize(cursor.requireInt(THUMBNAIL_RESTORE_STATE)), + archiveTransferState = ArchiveTransferState.deserialize(cursor.requireInt(ARCHIVE_TRANSFER_STATE)), uuid = UuidUtil.parseOrNull(cursor.requireString(ATTACHMENT_UUID)) ) } @@ -2513,13 +2505,13 @@ class AttachmentTable( * * The first is the backfill process, which will happen after newly-enabling backups. That process will go: * 1. [NONE] - * 2. [BACKFILL_UPLOAD_IN_PROGRESS] - * 3. [BACKFILL_UPLOADED] + * 2. [UPLOAD_IN_PROGRESS] + * 3. [COPY_PENDING] * 4. [FINISHED] or [PERMANENT_FAILURE] * * The second is when newly sending/receiving an attachment after enabling backups. That process will go: * 1. [NONE] - * 2. [ATTACHMENT_TRANSFER_PENDING] + * 2. [COPY_PENDING] * 3. [FINISHED] or [PERMANENT_FAILURE] */ enum class ArchiveTransferState(val value: Int) { @@ -2527,19 +2519,16 @@ class AttachmentTable( NONE(0), /** The upload to the attachment service is in progress. */ - BACKFILL_UPLOAD_IN_PROGRESS(1), + UPLOAD_IN_PROGRESS(1), - /** Successfully uploaded to the attachment service during the backfill process. Still need to tell the service to move the file over to the archive service. */ - BACKFILL_UPLOADED(2), + /** We sent/received this attachment after enabling backups, but still need to transfer the file to the archive service. */ + COPY_PENDING(2), /** Completely finished backing up the attachment. */ FINISHED(3), /** It is impossible to upload this attachment. */ - PERMANENT_FAILURE(4), - - /** We sent/received this attachment after enabling backups, but still need to transfer the file to the archive service. */ - ATTACHMENT_TRANSFER_PENDING(5); + PERMANENT_FAILURE(4); companion object { fun deserialize(value: Int): ArchiveTransferState { diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/MediaTable.kt b/app/src/main/java/org/thoughtcrime/securesms/database/MediaTable.kt index 380f4e6c0f..e377d1c39a 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/MediaTable.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/MediaTable.kt @@ -57,6 +57,7 @@ class MediaTable internal constructor(context: Context?, databaseHelper: SignalD ${AttachmentTable.TABLE_NAME}.${AttachmentTable.ARCHIVE_MEDIA_NAME}, ${AttachmentTable.TABLE_NAME}.${AttachmentTable.ARCHIVE_MEDIA_ID}, ${AttachmentTable.TABLE_NAME}.${AttachmentTable.THUMBNAIL_RESTORE_STATE}, + ${AttachmentTable.TABLE_NAME}.${AttachmentTable.ARCHIVE_TRANSFER_STATE}, ${AttachmentTable.TABLE_NAME}.${AttachmentTable.ATTACHMENT_UUID}, ${MessageTable.TABLE_NAME}.${MessageTable.TYPE}, ${MessageTable.TABLE_NAME}.${MessageTable.DATE_SENT}, diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/MessageTable.kt b/app/src/main/java/org/thoughtcrime/securesms/database/MessageTable.kt index 81c278f9b5..e3c0b8ac7a 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/MessageTable.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/MessageTable.kt @@ -391,6 +391,7 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat '${AttachmentTable.ARCHIVE_MEDIA_NAME}', ${AttachmentTable.TABLE_NAME}.${AttachmentTable.ARCHIVE_MEDIA_NAME}, '${AttachmentTable.ARCHIVE_MEDIA_ID}', ${AttachmentTable.TABLE_NAME}.${AttachmentTable.ARCHIVE_MEDIA_ID}, '${AttachmentTable.THUMBNAIL_RESTORE_STATE}', ${AttachmentTable.TABLE_NAME}.${AttachmentTable.THUMBNAIL_RESTORE_STATE}, + '${AttachmentTable.ARCHIVE_TRANSFER_STATE}', ${AttachmentTable.TABLE_NAME}.${AttachmentTable.ARCHIVE_TRANSFER_STATE}, '${AttachmentTable.ATTACHMENT_UUID}', ${AttachmentTable.TABLE_NAME}.${AttachmentTable.ATTACHMENT_UUID} ) ) AS ${AttachmentTable.ATTACHMENT_JSON_ALIAS} diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveAttachmentBackfillJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveAttachmentBackfillJob.kt index 764e083c4a..79a38245f8 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveAttachmentBackfillJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveAttachmentBackfillJob.kt @@ -7,283 +7,57 @@ package org.thoughtcrime.securesms.jobs import org.greenrobot.eventbus.EventBus import org.signal.core.util.logging.Log -import org.signal.protos.resumableuploads.ResumableUpload -import org.thoughtcrime.securesms.attachments.AttachmentId -import org.thoughtcrime.securesms.attachments.AttachmentUploadUtil -import org.thoughtcrime.securesms.attachments.DatabaseAttachment -import org.thoughtcrime.securesms.backup.v2.BackupRepository import org.thoughtcrime.securesms.backup.v2.BackupV2Event -import org.thoughtcrime.securesms.database.AttachmentTable import org.thoughtcrime.securesms.database.SignalDatabase import org.thoughtcrime.securesms.dependencies.AppDependencies import org.thoughtcrime.securesms.jobmanager.Job -import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint -import org.thoughtcrime.securesms.jobs.protos.ArchiveAttachmentBackfillJobData -import org.thoughtcrime.securesms.net.SignalNetwork -import org.whispersystems.signalservice.api.NetworkResult -import org.whispersystems.signalservice.api.archive.ArchiveMediaResponse -import org.whispersystems.signalservice.api.archive.ArchiveMediaUploadFormStatusCodes -import org.whispersystems.signalservice.api.attachment.AttachmentUploadResult -import java.io.IOException +import org.thoughtcrime.securesms.keyvalue.SignalStore import kotlin.time.Duration.Companion.days /** * When run, this will find the next attachment that needs to be uploaded to the archive service and upload it. * It will enqueue a copy of itself if it thinks there is more work to be done, and that copy will continue the upload process. */ -class ArchiveAttachmentBackfillJob private constructor( - parameters: Parameters, - private var attachmentId: AttachmentId?, - private var uploadSpec: ResumableUpload?, - private var totalCount: Int?, - private var progress: Int? -) : Job(parameters) { +class ArchiveAttachmentBackfillJob private constructor(parameters: Parameters) : Job(parameters) { companion object { private val TAG = Log.tag(ArchiveAttachmentBackfillJob::class.java) const val KEY = "ArchiveAttachmentBackfillJob" } - constructor(progress: Int? = null, totalCount: Int? = null) : this( + constructor() : this( parameters = Parameters.Builder() .setQueue("ArchiveAttachmentBackfillJob") .setMaxInstancesForQueue(2) .setLifespan(30.days.inWholeMilliseconds) .setMaxAttempts(Parameters.UNLIMITED) - .addConstraint(NetworkConstraint.KEY) - .build(), - attachmentId = null, - uploadSpec = null, - totalCount = totalCount, - progress = progress + .build() ) - override fun serialize(): ByteArray { - return ArchiveAttachmentBackfillJobData( - attachmentId = attachmentId?.id, - uploadSpec = uploadSpec - ).encode() - } + override fun serialize(): ByteArray? = null override fun getFactoryKey(): String = KEY override fun run(): Result { - EventBus.getDefault().postSticky(BackupV2Event(BackupV2Event.Type.PROGRESS_ATTACHMENTS, progress?.toLong() ?: 0, totalCount?.toLong() ?: 0)) - var attachmentRecord: DatabaseAttachment? = if (attachmentId != null) { - Log.i(TAG, "Retrying $attachmentId") - SignalDatabase.attachments.getAttachment(attachmentId!!) - } else { - SignalDatabase.attachments.getNextAttachmentToArchiveAndMarkUploadInProgress() - } + val jobs = SignalDatabase.attachments.getAttachmentsThatNeedArchiveUpload() + .map { attachmentId -> UploadAttachmentToArchiveJob(attachmentId, forBackfill = true) } - if (attachmentRecord == null && attachmentId != null) { - Log.w(TAG, "Attachment $attachmentId was not found! Was likely deleted during the process of archiving. Re-enqueuing job with no ID.") - reenqueueWithIncrementedProgress() - return Result.success() - } + SignalStore.backup.totalAttachmentUploadCount = jobs.size.toLong() + SignalStore.backup.currentAttachmentUploadCount = 0 - // TODO [backup] If we ever wanted to allow multiple instances of this job to run in parallel, this would have to be done somewhere else - if (attachmentRecord == null) { - Log.i(TAG, "No more attachments to backfill! Ensuring there's no dangling state.") + EventBus.getDefault().postSticky(BackupV2Event(BackupV2Event.Type.PROGRESS_ATTACHMENTS, count = 0, estimatedTotalCount = jobs.size.toLong())) - val resetCount = SignalDatabase.attachments.resetPendingArchiveBackfills() - if (resetCount > 0) { - Log.w(TAG, "We thought we were done, but $resetCount items were still in progress! Need to run again to retry.") - AppDependencies.jobManager.add( - ArchiveAttachmentBackfillJob( - progress = (totalCount ?: resetCount) - resetCount, - totalCount = totalCount ?: resetCount - ) - ) - } else { - Log.i(TAG, "All good! Should be done.") - } - EventBus.getDefault().postSticky(BackupV2Event(type = BackupV2Event.Type.FINISHED, count = totalCount?.toLong() ?: 0, estimatedTotalCount = totalCount?.toLong() ?: 0)) - return Result.success() - } + Log.i(TAG, "Adding ${jobs.size} jobs to backfill attachments.") + AppDependencies.jobManager.addAll(jobs) - attachmentId = attachmentRecord.attachmentId - - val transferState: AttachmentTable.ArchiveTransferState? = SignalDatabase.attachments.getArchiveTransferState(attachmentRecord.attachmentId) - if (transferState == null) { - Log.w(TAG, "Attachment $attachmentId was not found when looking for the transfer state! Was likely just deleted. Re-enqueuing job with no ID.") - reenqueueWithIncrementedProgress() - return Result.success() - } - - Log.i(TAG, "Current state: $transferState") - - if (transferState == AttachmentTable.ArchiveTransferState.FINISHED) { - Log.i(TAG, "Attachment $attachmentId is already finished. Skipping.") - reenqueueWithIncrementedProgress() - return Result.success() - } - - if (transferState == AttachmentTable.ArchiveTransferState.PERMANENT_FAILURE) { - Log.i(TAG, "Attachment $attachmentId is already marked as a permanent failure. Skipping.") - reenqueueWithIncrementedProgress() - return Result.success() - } - - if (transferState == AttachmentTable.ArchiveTransferState.ATTACHMENT_TRANSFER_PENDING) { - Log.i(TAG, "Attachment $attachmentId is already marked as pending transfer, meaning it's a send attachment that will be uploaded on it's own. Skipping.") - reenqueueWithIncrementedProgress() - return Result.success() - } - - if (transferState == AttachmentTable.ArchiveTransferState.BACKFILL_UPLOAD_IN_PROGRESS) { - if (uploadSpec == null || System.currentTimeMillis() > uploadSpec!!.timeout) { - Log.d(TAG, "Need an upload spec. Fetching...") - - val (spec, result) = fetchResumableUploadSpec() - if (result != null) { - return result - } - uploadSpec = spec - } else { - Log.d(TAG, "Already have an upload spec. Continuing...") - } - - val attachmentStream = try { - AttachmentUploadUtil.buildSignalServiceAttachmentStream( - context = context, - attachment = attachmentRecord, - uploadSpec = uploadSpec!!, - cancellationSignal = { this.isCanceled } - ) - } catch (e: IOException) { - Log.e(TAG, "Failed to get attachment stream for $attachmentId", e) - return Result.retry(defaultBackoff()) - } - - Log.d(TAG, "Beginning upload...") - val uploadResult: AttachmentUploadResult = when (val result = SignalNetwork.attachments.uploadAttachmentV4(attachmentStream)) { - is NetworkResult.Success -> result.result - is NetworkResult.ApplicationError -> throw result.throwable - is NetworkResult.NetworkError -> return Result.retry(defaultBackoff()) - is NetworkResult.StatusCodeError -> return Result.retry(defaultBackoff()) - } - Log.d(TAG, "Upload complete!") - - SignalDatabase.attachments.finalizeAttachmentAfterUpload(attachmentRecord.attachmentId, uploadResult) - SignalDatabase.attachments.setArchiveTransferState(attachmentRecord.attachmentId, AttachmentTable.ArchiveTransferState.BACKFILL_UPLOADED) - - attachmentRecord = SignalDatabase.attachments.getAttachment(attachmentRecord.attachmentId) - } - - if (attachmentRecord == null) { - Log.w(TAG, "$attachmentId was not found after uploading! Possibly deleted in a narrow race condition. Re-enqueuing job with no ID.") - reenqueueWithIncrementedProgress() - return Result.success() - } - - Log.d(TAG, "Moving attachment to archive...") - return when (val result = BackupRepository.archiveMedia(attachmentRecord)) { - is NetworkResult.Success -> { - Log.d(TAG, "Move complete!") - - SignalDatabase.attachments.setArchiveTransferState(attachmentRecord.attachmentId, AttachmentTable.ArchiveTransferState.FINISHED) - ArchiveThumbnailUploadJob.enqueueIfNecessary(attachmentRecord.attachmentId) - reenqueueWithIncrementedProgress() - Result.success() - } - - is NetworkResult.ApplicationError -> { - Log.w(TAG, "Failed to archive ${attachmentRecord.attachmentId} due to an application error. Retrying.", result.throwable) - Result.retry(defaultBackoff()) - } - - is NetworkResult.NetworkError -> { - Log.w(TAG, "Encountered a transient network error. Retrying.") - Result.retry(defaultBackoff()) - } - - is NetworkResult.StatusCodeError -> { - Log.w(TAG, "Failed request with status code ${result.code} for ${attachmentRecord.attachmentId}") - - when (ArchiveMediaResponse.StatusCodes.from(result.code)) { - ArchiveMediaResponse.StatusCodes.BadArguments, - ArchiveMediaResponse.StatusCodes.InvalidPresentationOrSignature, - ArchiveMediaResponse.StatusCodes.InsufficientPermissions, - ArchiveMediaResponse.StatusCodes.RateLimited -> { - Result.retry(defaultBackoff()) - } - - ArchiveMediaResponse.StatusCodes.NoMediaSpaceRemaining -> { - // TODO [backup] This will end the process right away. We need to integrate this with client-driven retry UX. - Result.failure() - } - - ArchiveMediaResponse.StatusCodes.Unknown -> { - Result.retry(defaultBackoff()) - } - } - } - } + return Result.success() } - private fun reenqueueWithIncrementedProgress() { - AppDependencies.jobManager.add( - ArchiveAttachmentBackfillJob( - totalCount = totalCount, - progress = progress?.inc()?.coerceAtMost(totalCount ?: 0) - ) - ) - } - - override fun onFailure() { - attachmentId?.let { id -> - Log.w(TAG, "Failed to archive $id!") - } - } - - private fun fetchResumableUploadSpec(): Pair { - return when (val spec = BackupRepository.getMediaUploadSpec()) { - is NetworkResult.Success -> { - Log.d(TAG, "Got an upload spec!") - spec.result.toProto() to null - } - - is NetworkResult.ApplicationError -> { - Log.w(TAG, "Failed to get an upload spec due to an application error. Retrying.", spec.throwable) - return null to Result.retry(defaultBackoff()) - } - - is NetworkResult.NetworkError -> { - Log.w(TAG, "Encountered a transient network error. Retrying.") - return null to Result.retry(defaultBackoff()) - } - - is NetworkResult.StatusCodeError -> { - Log.w(TAG, "Failed request with status code ${spec.code}") - - when (ArchiveMediaUploadFormStatusCodes.from(spec.code)) { - ArchiveMediaUploadFormStatusCodes.BadArguments, - ArchiveMediaUploadFormStatusCodes.InvalidPresentationOrSignature, - ArchiveMediaUploadFormStatusCodes.InsufficientPermissions, - ArchiveMediaUploadFormStatusCodes.RateLimited -> { - return null to Result.retry(defaultBackoff()) - } - - ArchiveMediaUploadFormStatusCodes.Unknown -> { - return null to Result.retry(defaultBackoff()) - } - } - } - } - } + override fun onFailure() = Unit class Factory : Job.Factory { override fun create(parameters: Parameters, serializedData: ByteArray?): ArchiveAttachmentBackfillJob { - val data = serializedData?.let { ArchiveAttachmentBackfillJobData.ADAPTER.decode(it) } - - return ArchiveAttachmentBackfillJob( - parameters = parameters, - attachmentId = data?.attachmentId?.let { AttachmentId(it) }, - uploadSpec = data?.uploadSpec, - totalCount = data?.totalCount, - progress = data?.count - ) + return ArchiveAttachmentBackfillJob(parameters) } } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveAttachmentJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveAttachmentJob.kt deleted file mode 100644 index f487523cd3..0000000000 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveAttachmentJob.kt +++ /dev/null @@ -1,81 +0,0 @@ -package org.thoughtcrime.securesms.jobs - -import org.signal.core.util.logging.Log -import org.thoughtcrime.securesms.attachments.AttachmentId -import org.thoughtcrime.securesms.backup.v2.BackupRepository -import org.thoughtcrime.securesms.database.SignalDatabase -import org.thoughtcrime.securesms.dependencies.AppDependencies -import org.thoughtcrime.securesms.jobmanager.Job -import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint -import org.thoughtcrime.securesms.jobs.protos.ArchiveAttachmentJobData -import org.thoughtcrime.securesms.keyvalue.SignalStore -import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulResponseCodeException -import java.io.IOException -import java.util.concurrent.TimeUnit - -/** - * Copies and re-encrypts attachments from the attachment cdn to the archive cdn. - * - * Job will fail if the attachment isn't available on the attachment cdn, use [AttachmentUploadJob] to upload first if necessary. - */ -class ArchiveAttachmentJob private constructor(private val attachmentId: AttachmentId, parameters: Parameters) : BaseJob(parameters) { - - companion object { - private val TAG = Log.tag(ArchiveAttachmentJob::class.java) - - const val KEY = "ArchiveAttachmentJob" - - fun enqueueIfPossible(attachmentId: AttachmentId) { - if (!SignalStore.backup.backsUpMedia) { - return - } - - AppDependencies.jobManager.add(ArchiveAttachmentJob(attachmentId)) - } - } - - constructor(attachmentId: AttachmentId) : this( - attachmentId = attachmentId, - parameters = Parameters.Builder() - .addConstraint(NetworkConstraint.KEY) - .setLifespan(TimeUnit.DAYS.toMillis(1)) - .setMaxAttempts(Parameters.UNLIMITED) - .build() - ) - - override fun serialize(): ByteArray = ArchiveAttachmentJobData(attachmentId.id).encode() - - override fun getFactoryKey(): String = KEY - - override fun onRun() { - if (!SignalStore.backup.backsUpMedia) { - Log.w(TAG, "Do not have permission to read/write to archive cdn") - return - } - - val attachment = SignalDatabase.attachments.getAttachment(attachmentId) - - if (attachment == null) { - Log.w(TAG, "Unable to find attachment to archive: $attachmentId") - return - } - - BackupRepository.archiveMedia(attachment).successOrThrow() - ArchiveThumbnailUploadJob.enqueueIfNecessary(attachmentId) - - SignalStore.backup.usedBackupMediaSpace += attachment.size - } - - override fun onShouldRetry(e: Exception): Boolean { - return e is IOException && e !is NonSuccessfulResponseCodeException - } - - override fun onFailure() = Unit - - class Factory : Job.Factory { - override fun create(parameters: Parameters, serializedData: ByteArray?): ArchiveAttachmentJob { - val jobData = ArchiveAttachmentJobData.ADAPTER.decode(serializedData!!) - return ArchiveAttachmentJob(AttachmentId(jobData.attachmentId), parameters) - } - } -} diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentDownloadJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentDownloadJob.kt index 3ef57eb49f..cb30352d1e 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentDownloadJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentDownloadJob.kt @@ -210,16 +210,34 @@ class AttachmentDownloadJob private constructor( Log.i(TAG, "Downloading push part $attachmentId") SignalDatabase.attachments.setTransferState(messageId, attachmentId, AttachmentTable.TRANSFER_PROGRESS_STARTED) - when (attachment.cdn) { - Cdn.S3 -> retrieveAttachmentForReleaseChannel(messageId, attachmentId, attachment) - else -> retrieveAttachment(messageId, attachmentId, attachment) + val digestChanged = when (attachment.cdn) { + Cdn.S3 -> { + retrieveAttachmentForReleaseChannel(messageId, attachmentId, attachment) + false + } + else -> { + retrieveAttachment(messageId, attachmentId, attachment) + } } - if ((attachment.cdn == Cdn.CDN_2 || attachment.cdn == Cdn.CDN_3) && - attachment.archiveMediaId == null && - SignalStore.backup.backsUpMedia - ) { - AppDependencies.jobManager.add(ArchiveAttachmentJob(attachmentId)) + if (SignalStore.backup.backsUpMedia) { + when { + attachment.archiveTransferState == AttachmentTable.ArchiveTransferState.FINISHED -> { + Log.i(TAG, "[$attachmentId] Already archived. Skipping.") + } + digestChanged -> { + Log.i(TAG, "[$attachmentId] Digest for attachment changed after download. Re-uploading to archive.") + AppDependencies.jobManager.add(UploadAttachmentToArchiveJob(attachmentId)) + } + attachment.cdn !in CopyAttachmentToArchiveJob.ALLOWED_SOURCE_CDNS -> { + Log.i(TAG, "[$attachmentId] Attachment CDN doesn't support copying to archive. Re-uploading to archive.") + AppDependencies.jobManager.add(UploadAttachmentToArchiveJob(attachmentId)) + } + else -> { + Log.i(TAG, "[$attachmentId] Enqueuing job to copy to archive.") + AppDependencies.jobManager.add(CopyAttachmentToArchiveJob(attachmentId)) + } + } } } @@ -234,12 +252,15 @@ class AttachmentDownloadJob private constructor( exception is RetryLaterException } + /** + * @return True if the digest changed as part of downloading, otherwise false. + */ @Throws(IOException::class, RetryLaterException::class) private fun retrieveAttachment( messageId: Long, attachmentId: AttachmentId, attachment: DatabaseAttachment - ) { + ): Boolean { val maxReceiveSize: Long = RemoteConfig.maxAttachmentReceiveSizeBytes val attachmentFile: File = SignalDatabase.attachments.getOrCreateTransferFile(attachmentId) @@ -269,7 +290,7 @@ class AttachmentDownloadJob private constructor( progressListener ) - SignalDatabase.attachments.finalizeAttachmentAfterDownload(messageId, attachmentId, downloadResult.dataStream, downloadResult.iv) + return SignalDatabase.attachments.finalizeAttachmentAfterDownload(messageId, attachmentId, downloadResult.dataStream, downloadResult.iv) } catch (e: RangeException) { Log.w(TAG, "Range exception, file size " + attachmentFile.length(), e) if (attachmentFile.delete()) { @@ -285,7 +306,7 @@ class AttachmentDownloadJob private constructor( if (SignalStore.backup.backsUpMedia && e.code == 404 && attachment.archiveMediaName?.isNotEmpty() == true) { Log.i(TAG, "Retrying download from archive CDN") RestoreAttachmentJob.restoreAttachment(attachment) - return + return false } Log.w(TAG, "Experienced exception while trying to download an attachment.", e) @@ -305,6 +326,8 @@ class AttachmentDownloadJob private constructor( markFailed(messageId, attachmentId) } } + + return false } @Throws(InvalidAttachmentException::class) diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentUploadJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentUploadJob.kt index d9bd34d68a..10ea49fc4e 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentUploadJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentUploadJob.kt @@ -177,7 +177,17 @@ class AttachmentUploadJob private constructor( buildAttachmentStream(databaseAttachment, notification, uploadSpec!!).use { localAttachment -> val uploadResult: AttachmentUploadResult = SignalNetwork.attachments.uploadAttachmentV4(localAttachment).successOrThrow() SignalDatabase.attachments.finalizeAttachmentAfterUpload(databaseAttachment.attachmentId, uploadResult) - ArchiveThumbnailUploadJob.enqueueIfNecessary(databaseAttachment.attachmentId) + if (SignalStore.backup.backsUpMedia) { + when { + databaseAttachment.archiveTransferState == AttachmentTable.ArchiveTransferState.FINISHED -> { + Log.i(TAG, "[$attachmentId] Already archived. Skipping.") + } + else -> { + Log.i(TAG, "[$attachmentId] Enqueuing job to copy to archive.") + AppDependencies.jobManager.add(CopyAttachmentToArchiveJob(attachmentId)) + } + } + } } } } catch (e: StreamResetException) { diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupMessagesJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupMessagesJob.kt index 52e68c0cc6..4d6fa60a4f 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupMessagesJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupMessagesJob.kt @@ -5,10 +5,8 @@ package org.thoughtcrime.securesms.jobs -import android.database.Cursor import org.greenrobot.eventbus.EventBus import org.signal.core.util.logging.Log -import org.thoughtcrime.securesms.attachments.DatabaseAttachment import org.thoughtcrime.securesms.backup.v2.BackupRepository import org.thoughtcrime.securesms.backup.v2.BackupV2Event import org.thoughtcrime.securesms.database.SignalDatabase @@ -21,13 +19,12 @@ import org.thoughtcrime.securesms.providers.BlobProvider import org.whispersystems.signalservice.api.NetworkResult import java.io.FileInputStream import java.io.FileOutputStream -import java.io.IOException /** * Job that is responsible for exporting the DB as a backup proto and * also uploading the resulting proto. */ -class BackupMessagesJob private constructor(parameters: Parameters) : BaseJob(parameters) { +class BackupMessagesJob private constructor(parameters: Parameters) : Job(parameters) { companion object { private val TAG = Log.tag(BackupMessagesJob::class.java) @@ -66,60 +63,7 @@ class BackupMessagesJob private constructor(parameters: Parameters) : BaseJob(pa override fun onFailure() = Unit - private fun archiveAttachments(): Boolean { - if (!SignalStore.backup.backsUpMedia) return false - - val batchSize = 100 - var needToBackfill = 0 - var totalCount: Int - var progress = 0 - SignalDatabase.attachments.getArchivableAttachments().use { cursor -> - totalCount = cursor.count - while (!cursor.isAfterLast) { - val attachments = cursor.readAttachmentBatch(batchSize) - - when (val archiveResult = BackupRepository.archiveMedia(attachments)) { - is NetworkResult.Success -> { - Log.i(TAG, "Archive call successful") - for (notFound in archiveResult.result.sourceNotFoundResponses) { - val attachmentId = archiveResult.result.mediaIdToAttachmentId(notFound.mediaId) - Log.i(TAG, "Attachment $attachmentId not found on cdn, will need to re-upload") - needToBackfill++ - } - for (success in archiveResult.result.successfulResponses) { - val attachmentId = archiveResult.result.mediaIdToAttachmentId(success.mediaId) - ArchiveThumbnailUploadJob.enqueueIfNecessary(attachmentId) - } - progress += attachments.size - } - - else -> { - Log.e(TAG, "Failed to archive $archiveResult") - } - } - EventBus.getDefault().postSticky(BackupV2Event(BackupV2Event.Type.PROGRESS_ATTACHMENTS, (progress - needToBackfill).toLong(), totalCount.toLong())) - } - } - if (needToBackfill > 0) { - AppDependencies.jobManager.add(ArchiveAttachmentBackfillJob(totalCount = totalCount, progress = progress - needToBackfill)) - return true - } - return false - } - - private fun Cursor.readAttachmentBatch(batchSize: Int): List { - val attachments = ArrayList() - for (i in 0 until batchSize) { - if (this.moveToNext()) { - attachments.addAll(SignalDatabase.attachments.getAttachments(this)) - } else { - break - } - } - return attachments - } - - override fun onRun() { + override fun run(): Result { EventBus.getDefault().postSticky(BackupV2Event(type = BackupV2Event.Type.PROGRESS_MESSAGES, count = 0, estimatedTotalCount = 0)) val tempBackupFile = BlobProvider.getInstance().forNonAutoEncryptingSingleSessionOnDisk(AppDependencies.application) @@ -127,28 +71,40 @@ class BackupMessagesJob private constructor(parameters: Parameters) : BaseJob(pa BackupRepository.export(outputStream = outputStream, append = { tempBackupFile.appendBytes(it) }, plaintext = false) FileInputStream(tempBackupFile).use { - BackupRepository.uploadBackupFile(it, tempBackupFile.length()) + when (val result = BackupRepository.uploadBackupFile(it, tempBackupFile.length())) { + is NetworkResult.Success -> Log.i(TAG, "Successfully uploaded backup file.") + is NetworkResult.NetworkError -> return Result.retry(defaultBackoff()) + is NetworkResult.StatusCodeError -> return Result.retry(defaultBackoff()) + is NetworkResult.ApplicationError -> throw result.throwable + } } - val needBackfill = archiveAttachments() SignalStore.backup.lastBackupProtoSize = tempBackupFile.length() if (!tempBackupFile.delete()) { Log.e(TAG, "Failed to delete temp backup file") } SignalStore.backup.lastBackupTime = System.currentTimeMillis() - - if (!needBackfill) { - EventBus.getDefault().postSticky(BackupV2Event(BackupV2Event.Type.FINISHED, 0, 0)) - try { - SignalStore.backup.usedBackupMediaSpace = (BackupRepository.getRemoteBackupUsedSpace().successOrThrow() ?: 0) - } catch (e: IOException) { - Log.e(TAG, "Failed to update used space") + SignalStore.backup.usedBackupMediaSpace = when (val result = BackupRepository.getRemoteBackupUsedSpace()) { + is NetworkResult.Success -> result.result ?: 0 + is NetworkResult.NetworkError -> SignalStore.backup.usedBackupMediaSpace // TODO enqueue a secondary job to fetch the latest number -- no need to fail this one + is NetworkResult.StatusCodeError -> { + Log.w(TAG, "Failed to get used space: ${result.code}") + SignalStore.backup.usedBackupMediaSpace } + is NetworkResult.ApplicationError -> throw result.throwable } - } - override fun onShouldRetry(e: Exception): Boolean = false + if (SignalDatabase.attachments.doAnyAttachmentsNeedArchiveUpload()) { + Log.i(TAG, "Enqueuing attachment backfill job.") + AppDependencies.jobManager.add(ArchiveAttachmentBackfillJob()) + } else { + Log.i(TAG, "No attachments need to be uploaded, we can finish.") + EventBus.getDefault().postSticky(BackupV2Event(BackupV2Event.Type.FINISHED, 0, 0)) + } + + return Result.success() + } class Factory : Job.Factory { override fun create(parameters: Parameters, serializedData: ByteArray?): BackupMessagesJob { diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/CopyAttachmentToArchiveJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/CopyAttachmentToArchiveJob.kt new file mode 100644 index 0000000000..fe02a6c45b --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/CopyAttachmentToArchiveJob.kt @@ -0,0 +1,180 @@ +package org.thoughtcrime.securesms.jobs + +import org.greenrobot.eventbus.EventBus +import org.signal.core.util.logging.Log +import org.thoughtcrime.securesms.attachments.AttachmentId +import org.thoughtcrime.securesms.attachments.Cdn +import org.thoughtcrime.securesms.attachments.DatabaseAttachment +import org.thoughtcrime.securesms.backup.v2.BackupRepository +import org.thoughtcrime.securesms.backup.v2.BackupV2Event +import org.thoughtcrime.securesms.database.AttachmentTable +import org.thoughtcrime.securesms.database.SignalDatabase +import org.thoughtcrime.securesms.dependencies.AppDependencies +import org.thoughtcrime.securesms.jobmanager.Job +import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint +import org.thoughtcrime.securesms.jobs.protos.CopyAttachmentToArchiveJobData +import org.thoughtcrime.securesms.keyvalue.SignalStore +import org.whispersystems.signalservice.api.NetworkResult +import org.whispersystems.signalservice.api.crypto.AttachmentCipherStreamUtil +import org.whispersystems.signalservice.internal.crypto.PaddingInputStream +import java.lang.RuntimeException +import java.util.concurrent.TimeUnit + +/** + * Copies and re-encrypts attachments from the attachment cdn to the archive cdn. + * If it's discovered that the attachment no longer exists on the attachment cdn, this job will schedule a re-upload via [UploadAttachmentToArchiveJob]. + */ +class CopyAttachmentToArchiveJob private constructor(private val attachmentId: AttachmentId, private val forBackfill: Boolean, parameters: Parameters) : Job(parameters) { + + companion object { + private val TAG = Log.tag(CopyAttachmentToArchiveJob::class.java) + + const val KEY = "CopyAttachmentToArchiveJob" + + /** CDNs that we can copy data from */ + val ALLOWED_SOURCE_CDNS = setOf(Cdn.CDN_2, Cdn.CDN_3) + } + + constructor(attachmentId: AttachmentId, forBackfill: Boolean = false) : this( + attachmentId = attachmentId, + forBackfill = forBackfill, + parameters = Parameters.Builder() + .addConstraint(NetworkConstraint.KEY) + .setLifespan(TimeUnit.DAYS.toMillis(1)) + .setMaxAttempts(Parameters.UNLIMITED) + .setQueue(UploadAttachmentToArchiveJob.buildQueueKey(attachmentId)) + .build() + ) + + override fun serialize(): ByteArray = CopyAttachmentToArchiveJobData( + attachmentId = attachmentId.id, + forBackfill = forBackfill + ).encode() + + override fun getFactoryKey(): String = KEY + + override fun onAdded() { + val transferStatus = SignalDatabase.attachments.getArchiveTransferState(attachmentId) ?: return + + if (transferStatus == AttachmentTable.ArchiveTransferState.NONE || transferStatus == AttachmentTable.ArchiveTransferState.UPLOAD_IN_PROGRESS) { + Log.d(TAG, "[$attachmentId] Updating archive transfer state to ${AttachmentTable.ArchiveTransferState.COPY_PENDING}") + SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.COPY_PENDING) + } + } + + override fun run(): Result { + if (!SignalStore.backup.backsUpMedia) { + Log.w(TAG, "[$attachmentId] This user does not back up media. Skipping.") + return Result.success() + } + + val attachment: DatabaseAttachment? = SignalDatabase.attachments.getAttachment(attachmentId) + + if (attachment == null) { + Log.w(TAG, "[$attachmentId] Attachment no longer exists! Skipping.") + return Result.failure() + } + + if (attachment.archiveTransferState == AttachmentTable.ArchiveTransferState.FINISHED) { + Log.i(TAG, "[$attachmentId] Already finished. Skipping.") + return Result.success() + } + + if (attachment.archiveTransferState == AttachmentTable.ArchiveTransferState.PERMANENT_FAILURE) { + Log.i(TAG, "[$attachmentId] Already marked as a permanent failure. Skipping.") + return Result.failure() + } + + if (attachment.archiveTransferState == AttachmentTable.ArchiveTransferState.NONE) { + Log.i(TAG, "[$attachmentId] Not marked as pending copy. Enqueueing an upload job instead.") + AppDependencies.jobManager.add(UploadAttachmentToArchiveJob(attachmentId)) + return Result.success() + } + + val result = when (val archiveResult = BackupRepository.archiveMedia(attachment)) { + is NetworkResult.Success -> { + Log.i(TAG, "[$attachmentId] Successfully copied the archive tier.") + Result.success() + } + + is NetworkResult.NetworkError -> { + Log.w(TAG, "[$attachmentId] Encountered a retryable network error.", archiveResult.exception) + Result.retry(defaultBackoff()) + } + + is NetworkResult.StatusCodeError -> { + when (archiveResult.code) { + 403 -> { + // TODO [backup] What is the best way to handle this UX-wise? + Log.w(TAG, "[$attachmentId] Insufficient permissions to upload. Is the user no longer on media tier?") + Result.success() + } + 410 -> { + Log.w(TAG, "[$attachmentId] The attachment no longer exists on the transit tier. Scheduling a re-upload.") + SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE) + AppDependencies.jobManager.add(UploadAttachmentToArchiveJob(attachmentId)) + Result.success() + } + 413 -> { + // TODO [backup] What is the best way to handle this UX-wise? + Log.w(TAG, "[$attachmentId] Insufficient storage space! Can't upload!") + Result.success() + } + else -> { + Log.w(TAG, "[$attachmentId] Got back a non-2xx status code: ${archiveResult.code}. Retrying.") + Result.retry(defaultBackoff()) + } + } + } + + is NetworkResult.ApplicationError -> { + Log.w(TAG, "[$attachmentId] Encountered a fatal error when trying to upload!") + Result.fatalFailure(RuntimeException(archiveResult.throwable)) + } + } + + if (result.isSuccess) { + SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.FINISHED) + + ArchiveThumbnailUploadJob.enqueueIfNecessary(attachmentId) + SignalStore.backup.usedBackupMediaSpace += AttachmentCipherStreamUtil.getCiphertextLength(PaddingInputStream.getPaddedSize(attachment.size)) + + incrementBackfillProgressIfNecessary() + } + + return result + } + + override fun onFailure() { + incrementBackfillProgressIfNecessary() + } + + private fun incrementBackfillProgressIfNecessary() { + if (!forBackfill) { + return + } + + if (SignalStore.backup.totalAttachmentUploadCount > 0) { + SignalStore.backup.currentAttachmentUploadCount++ + + if (SignalStore.backup.currentAttachmentUploadCount >= SignalStore.backup.totalAttachmentUploadCount) { + EventBus.getDefault().postSticky(BackupV2Event(BackupV2Event.Type.FINISHED, count = 0, estimatedTotalCount = 0)) + SignalStore.backup.currentAttachmentUploadCount = 0 + SignalStore.backup.totalAttachmentUploadCount = 0 + } else { + EventBus.getDefault().postSticky(BackupV2Event(BackupV2Event.Type.PROGRESS_ATTACHMENTS, count = SignalStore.backup.currentAttachmentUploadCount, estimatedTotalCount = SignalStore.backup.totalAttachmentUploadCount)) + } + } + } + + class Factory : Job.Factory { + override fun create(parameters: Parameters, serializedData: ByteArray?): CopyAttachmentToArchiveJob { + val jobData = CopyAttachmentToArchiveJobData.ADAPTER.decode(serializedData!!) + return CopyAttachmentToArchiveJob( + attachmentId = AttachmentId(jobData.attachmentId), + forBackfill = jobData.forBackfill, + parameters = parameters + ) + } + } +} diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java index 4bf58aa0e8..31e3f8e344 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java @@ -106,7 +106,7 @@ public final class JobManagerFactories { return new HashMap() {{ put(AccountConsistencyWorkerJob.KEY, new AccountConsistencyWorkerJob.Factory()); put(AnalyzeDatabaseJob.KEY, new AnalyzeDatabaseJob.Factory()); - put(ArchiveAttachmentJob.KEY, new ArchiveAttachmentJob.Factory()); + put(ApkUpdateJob.KEY, new ApkUpdateJob.Factory()); put(ArchiveAttachmentBackfillJob.KEY, new ArchiveAttachmentBackfillJob.Factory()); put(ArchiveThumbnailUploadJob.KEY, new ArchiveThumbnailUploadJob.Factory()); put(AttachmentCompressionJob.KEY, new AttachmentCompressionJob.Factory()); @@ -134,6 +134,7 @@ public final class JobManagerFactories { put(ContactLinkRebuildMigrationJob.KEY, new ContactLinkRebuildMigrationJob.Factory()); put(ConversationShortcutRankingUpdateJob.KEY, new ConversationShortcutRankingUpdateJob.Factory()); put(ConversationShortcutUpdateJob.KEY, new ConversationShortcutUpdateJob.Factory()); + put(CopyAttachmentToArchiveJob.KEY, new CopyAttachmentToArchiveJob.Factory()); put(CreateReleaseChannelJob.KEY, new CreateReleaseChannelJob.Factory()); put(DirectoryRefreshJob.KEY, new DirectoryRefreshJob.Factory()); put(DonationReceiptRedemptionJob.KEY, new DonationReceiptRedemptionJob.Factory()); @@ -251,7 +252,7 @@ public final class JobManagerFactories { put(ThreadUpdateJob.KEY, new ThreadUpdateJob.Factory()); put(TrimThreadJob.KEY, new TrimThreadJob.Factory()); put(TypingSendJob.KEY, new TypingSendJob.Factory()); - put(ApkUpdateJob.KEY, new ApkUpdateJob.Factory()); + put(UploadAttachmentToArchiveJob.KEY, new UploadAttachmentToArchiveJob.Factory()); // Migrations put(AccountConsistencyMigrationJob.KEY, new AccountConsistencyMigrationJob.Factory()); diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt new file mode 100644 index 0000000000..ba135a1d5d --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt @@ -0,0 +1,204 @@ +/* + * Copyright 2024 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.thoughtcrime.securesms.jobs + +import org.signal.core.util.logging.Log +import org.signal.protos.resumableuploads.ResumableUpload +import org.thoughtcrime.securesms.attachments.AttachmentId +import org.thoughtcrime.securesms.attachments.AttachmentUploadUtil +import org.thoughtcrime.securesms.attachments.DatabaseAttachment +import org.thoughtcrime.securesms.backup.v2.BackupRepository +import org.thoughtcrime.securesms.database.AttachmentTable +import org.thoughtcrime.securesms.database.SignalDatabase +import org.thoughtcrime.securesms.dependencies.AppDependencies +import org.thoughtcrime.securesms.jobmanager.Job +import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint +import org.thoughtcrime.securesms.jobs.protos.UploadAttachmentToArchiveJobData +import org.thoughtcrime.securesms.keyvalue.SignalStore +import org.thoughtcrime.securesms.net.SignalNetwork +import org.whispersystems.signalservice.api.NetworkResult +import org.whispersystems.signalservice.api.archive.ArchiveMediaUploadFormStatusCodes +import org.whispersystems.signalservice.api.attachment.AttachmentUploadResult +import java.io.IOException +import kotlin.time.Duration.Companion.days + +/** + * Given an attachmentId, this will upload the corresponding attachment to the archive cdn. + * To do this, it must first upload it to the attachment cdn, and then copy it to the archive cdn. + */ +class UploadAttachmentToArchiveJob private constructor( + private val attachmentId: AttachmentId, + private var uploadSpec: ResumableUpload?, + private val forBackfill: Boolean, + parameters: Parameters +) : Job(parameters) { + + companion object { + private val TAG = Log.tag(UploadAttachmentToArchiveJob::class) + const val KEY = "UploadAttachmentToArchiveJob" + + fun buildQueueKey(attachmentId: AttachmentId) = "ArchiveAttachmentJobs_${attachmentId.id}" + } + + constructor(attachmentId: AttachmentId, forBackfill: Boolean = false) : this( + attachmentId = attachmentId, + uploadSpec = null, + forBackfill = forBackfill, + parameters = Parameters.Builder() + .addConstraint(NetworkConstraint.KEY) + .setLifespan(30.days.inWholeMilliseconds) + .setMaxAttempts(Parameters.UNLIMITED) + .setQueue(buildQueueKey(attachmentId)) + .build() + ) + + override fun serialize(): ByteArray = UploadAttachmentToArchiveJobData( + attachmentId = attachmentId.id, + forBackfill = forBackfill + ).encode() + + override fun getFactoryKey(): String = KEY + + override fun onAdded() { + val transferStatus = SignalDatabase.attachments.getArchiveTransferState(attachmentId) ?: return + + if (transferStatus == AttachmentTable.ArchiveTransferState.NONE) { + Log.d(TAG, "[$attachmentId] Updating archive transfer state to ${AttachmentTable.ArchiveTransferState.UPLOAD_IN_PROGRESS}") + SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.UPLOAD_IN_PROGRESS) + } + } + + override fun run(): Result { + if (!SignalStore.backup.backsUpMedia) { + Log.w(TAG, "[$attachmentId] This user does not back up media. Skipping.") + return Result.success() + } + + val attachment: DatabaseAttachment? = SignalDatabase.attachments.getAttachment(attachmentId) + + if (attachment == null) { + Log.w(TAG, "[$attachmentId] Attachment no longer exists! Skipping.") + return Result.failure() + } + + if (attachment.archiveTransferState == AttachmentTable.ArchiveTransferState.FINISHED) { + Log.i(TAG, "[$attachmentId] Already finished. Skipping.") + return Result.success() + } + + if (attachment.archiveTransferState == AttachmentTable.ArchiveTransferState.PERMANENT_FAILURE) { + Log.i(TAG, "[$attachmentId] Already marked as a permanent failure. Skipping.") + return Result.failure() + } + + if (attachment.archiveTransferState == AttachmentTable.ArchiveTransferState.COPY_PENDING) { + Log.i(TAG, "[$attachmentId] Already marked as pending transfer. Enqueueing a copy job just in case.") + AppDependencies.jobManager.add(CopyAttachmentToArchiveJob(attachment.attachmentId, forBackfill = forBackfill)) + return Result.success() + } + + if (uploadSpec != null && System.currentTimeMillis() > uploadSpec!!.timeout) { + Log.w(TAG, "[$attachmentId] Upload spec expired! Clearing.") + uploadSpec = null + } + + if (uploadSpec == null) { + Log.d(TAG, "[$attachmentId] Need an upload spec. Fetching...") + + val (spec, result) = fetchResumableUploadSpec() + if (result != null) { + return result + } + + uploadSpec = spec + } else { + Log.d(TAG, "[$attachmentId] Already have an upload spec. Continuing...") + } + + val attachmentStream = try { + AttachmentUploadUtil.buildSignalServiceAttachmentStream( + context = context, + attachment = attachment, + uploadSpec = uploadSpec!!, + cancellationSignal = { this.isCanceled } + ) + } catch (e: IOException) { + Log.e(TAG, "[$attachmentId] Failed to get attachment stream.", e) + return Result.retry(defaultBackoff()) + } + + Log.d(TAG, "[$attachmentId] Beginning upload...") + val uploadResult: AttachmentUploadResult = when (val result = SignalNetwork.attachments.uploadAttachmentV4(attachmentStream)) { + is NetworkResult.Success -> result.result + is NetworkResult.ApplicationError -> throw result.throwable + is NetworkResult.NetworkError -> { + Log.w(TAG, "[$attachmentId] Failed to upload due to network error.", result.exception) + return Result.retry(defaultBackoff()) + } + is NetworkResult.StatusCodeError -> { + Log.w(TAG, "[$attachmentId] Failed to upload due to status code error. Code: ${result.code}", result.exception) + return Result.retry(defaultBackoff()) + } + } + Log.d(TAG, "[$attachmentId] Upload complete!") + + SignalDatabase.attachments.finalizeAttachmentAfterUpload(attachment.attachmentId, uploadResult) + + AppDependencies.jobManager.add(CopyAttachmentToArchiveJob(attachment.attachmentId, forBackfill = forBackfill)) + + return Result.success() + } + + override fun onFailure() = Unit + + private fun fetchResumableUploadSpec(): Pair { + return when (val spec = BackupRepository.getMediaUploadSpec()) { + is NetworkResult.Success -> { + Log.d(TAG, "[$attachmentId] Got an upload spec!") + spec.result.toProto() to null + } + + is NetworkResult.ApplicationError -> { + Log.w(TAG, "[$attachmentId] Failed to get an upload spec due to an application error. Retrying.", spec.throwable) + return null to Result.retry(defaultBackoff()) + } + + is NetworkResult.NetworkError -> { + Log.w(TAG, "[$attachmentId] Encountered a transient network error. Retrying.") + return null to Result.retry(defaultBackoff()) + } + + is NetworkResult.StatusCodeError -> { + Log.w(TAG, "[$attachmentId] Failed request with status code ${spec.code}") + + when (ArchiveMediaUploadFormStatusCodes.from(spec.code)) { + ArchiveMediaUploadFormStatusCodes.BadArguments, + ArchiveMediaUploadFormStatusCodes.InvalidPresentationOrSignature, + ArchiveMediaUploadFormStatusCodes.InsufficientPermissions, + ArchiveMediaUploadFormStatusCodes.RateLimited -> { + return null to Result.retry(defaultBackoff()) + } + + ArchiveMediaUploadFormStatusCodes.Unknown -> { + return null to Result.retry(defaultBackoff()) + } + } + } + } + } + + class Factory : Job.Factory { + override fun create(parameters: Parameters, serializedData: ByteArray?): UploadAttachmentToArchiveJob { + val data = UploadAttachmentToArchiveJobData.ADAPTER.decode(serializedData!!) + return UploadAttachmentToArchiveJob( + attachmentId = AttachmentId(data.attachmentId), + uploadSpec = data.uploadSpec, + forBackfill = data.forBackfill, + parameters = parameters + ) + } + } +} diff --git a/app/src/main/java/org/thoughtcrime/securesms/keyvalue/BackupValues.kt b/app/src/main/java/org/thoughtcrime/securesms/keyvalue/BackupValues.kt index 7e4e571843..e69060e35b 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/keyvalue/BackupValues.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/keyvalue/BackupValues.kt @@ -37,6 +37,9 @@ class BackupValues(store: KeyValueStore) : SignalStoreValues(store) { private const val KEY_OPTIMIZE_STORAGE = "backup.optimizeStorage" private const val KEY_BACKUPS_INITIALIZED = "backup.initialized" + private const val KEY_TOTAL_ATTACHMENTS_UPLOAD_COUNT = "backup.totalAttachmentsUploadCount" + private const val KEY_CURRENT_ATTACHMENT_UPLOAD_COUNT = "backup.currentAttachmentUploadCount" + /** * Specifies whether remote backups are enabled on this device. */ @@ -66,6 +69,16 @@ class BackupValues(store: KeyValueStore) : SignalStoreValues(store) { var backupFrequency: BackupFrequency by enumValue(KEY_BACKUP_FREQUENCY, BackupFrequency.MANUAL, BackupFrequency.Serializer) var backupTier: MessageBackupTier? by enumValue(KEY_BACKUP_TIER, null, MessageBackupTier.Serializer) + /** + * When uploading attachments to the archive CDN, this tracks the total number of attachments that are pending upload. + */ + var totalAttachmentUploadCount: Long by longValue(KEY_TOTAL_ATTACHMENTS_UPLOAD_COUNT, 0) + + /** + * When uploading attachments to the archive CDN, this tracks the total number of attachments that have currently been uploaded. + */ + var currentAttachmentUploadCount: Long by longValue(KEY_CURRENT_ATTACHMENT_UPLOAD_COUNT, 0) + val totalBackupSize: Long get() = lastBackupProtoSize + usedBackupMediaSpace /** True if the user backs up media, otherwise false. */ diff --git a/app/src/main/protowire/JobData.proto b/app/src/main/protowire/JobData.proto index a708db2ca0..ee72c2a788 100644 --- a/app/src/main/protowire/JobData.proto +++ b/app/src/main/protowire/JobData.proto @@ -58,17 +58,6 @@ message PreKeysSyncJobData { bool forceRefreshRequested = 1; } -message ArchiveAttachmentJobData { - uint64 attachmentId = 1; -} - -message ArchiveAttachmentBackfillJobData { - optional uint64 attachmentId = 1; - ResumableUpload uploadSpec = 2; - optional uint32 count = 3; - optional uint32 totalCount = 4; -} - message ArchiveThumbnailUploadJobData { uint64 attachmentId = 1; } @@ -135,3 +124,14 @@ message RestoreAttachmentJobData { uint64 attachmentId = 2; bool offloaded = 3; } + +message CopyAttachmentToArchiveJobData { + uint64 attachmentId = 1; + bool forBackfill = 2; +} + +message UploadAttachmentToArchiveJobData { + uint64 attachmentId = 1; + ResumableUpload uploadSpec = 2; + bool forBackfill = 3; +} diff --git a/app/src/spinner/java/org/thoughtcrime/securesms/database/AttachmentTransformer.kt b/app/src/spinner/java/org/thoughtcrime/securesms/database/AttachmentTransformer.kt index 44f6bbdf7c..cfaa3ef593 100644 --- a/app/src/spinner/java/org/thoughtcrime/securesms/database/AttachmentTransformer.kt +++ b/app/src/spinner/java/org/thoughtcrime/securesms/database/AttachmentTransformer.kt @@ -10,12 +10,26 @@ import org.signal.core.util.requireInt import org.signal.spinner.ColumnTransformer object AttachmentTransformer : ColumnTransformer { + + val COLUMNS = setOf( + AttachmentTable.TRANSFER_STATE, + AttachmentTable.ARCHIVE_TRANSFER_STATE + ) + override fun matches(tableName: String?, columnName: String): Boolean { - return (tableName == AttachmentTable.TABLE_NAME || tableName == null) && columnName == AttachmentTable.TRANSFER_STATE + return (tableName == AttachmentTable.TABLE_NAME || tableName == null) && columnName in COLUMNS } - override fun transform(tableName: String?, columnName: String, cursor: Cursor): String? { - val value = cursor.requireInt(columnName) + override fun transform(tableName: String?, columnName: String, cursor: Cursor): String { + return when (columnName) { + AttachmentTable.TRANSFER_STATE -> return cursor.toTransferState() + AttachmentTable.ARCHIVE_TRANSFER_STATE -> return cursor.toArchiveTransferState() + else -> "UNKNOWN" + } + } + + private fun Cursor.toTransferState(): String { + val value = this.requireInt(AttachmentTable.TRANSFER_STATE) val string = when (value) { AttachmentTable.TRANSFER_PROGRESS_DONE -> "DONE" AttachmentTable.TRANSFER_PROGRESS_PENDING -> "PENDING" @@ -29,4 +43,9 @@ object AttachmentTransformer : ColumnTransformer { } return "$string ($value)" } + + private fun Cursor.toArchiveTransferState(): String { + val state = AttachmentTable.ArchiveTransferState.deserialize(this.requireInt(AttachmentTable.ARCHIVE_TRANSFER_STATE)) + return "${state.name} (${state.value})" + } } diff --git a/app/src/test/java/org/thoughtcrime/securesms/sms/UploadDependencyGraphTest.kt b/app/src/test/java/org/thoughtcrime/securesms/sms/UploadDependencyGraphTest.kt index 5703e7512c..79427baf02 100644 --- a/app/src/test/java/org/thoughtcrime/securesms/sms/UploadDependencyGraphTest.kt +++ b/app/src/test/java/org/thoughtcrime/securesms/sms/UploadDependencyGraphTest.kt @@ -263,6 +263,7 @@ class UploadDependencyGraphTest { archiveMediaName = null, archiveCdn = 0, thumbnailRestoreState = AttachmentTable.ThumbnailRestoreState.NONE, + archiveTransferState = AttachmentTable.ArchiveTransferState.NONE, uuid = null ) } diff --git a/app/src/testShared/org/thoughtcrime/securesms/database/FakeMessageRecords.kt b/app/src/testShared/org/thoughtcrime/securesms/database/FakeMessageRecords.kt index a7fc3eef4d..e4ac088a37 100644 --- a/app/src/testShared/org/thoughtcrime/securesms/database/FakeMessageRecords.kt +++ b/app/src/testShared/org/thoughtcrime/securesms/database/FakeMessageRecords.kt @@ -64,7 +64,8 @@ object FakeMessageRecords { archiveMediaName: String? = null, archiveMediaId: String? = null, archiveThumbnailId: String? = null, - thumbnailRestoreState: AttachmentTable.ThumbnailRestoreState = AttachmentTable.ThumbnailRestoreState.NONE + thumbnailRestoreState: AttachmentTable.ThumbnailRestoreState = AttachmentTable.ThumbnailRestoreState.NONE, + archiveTransferState: AttachmentTable.ArchiveTransferState = AttachmentTable.ArchiveTransferState.NONE ): DatabaseAttachment { return DatabaseAttachment( attachmentId = attachmentId, @@ -102,6 +103,7 @@ object FakeMessageRecords { archiveMediaName = archiveMediaId, archiveMediaId = archiveMediaName, thumbnailRestoreState = thumbnailRestoreState, + archiveTransferState = archiveTransferState, uuid = null ) } diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/archive/ArchiveApi.kt b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/archive/ArchiveApi.kt index 4aa3d58f4b..f033894861 100644 --- a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/archive/ArchiveApi.kt +++ b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/archive/ArchiveApi.kt @@ -87,6 +87,12 @@ class ArchiveApi(private val pushServiceSocket: PushServiceSocket) { /** * Fetches an upload form you can use to upload your main message backup file to cloud storage. + * + * Responses + * 200: Success + * 400: Bad args, or made on an authenticated channel + * 403: Insufficient permissions + * 429: Rate-limited */ fun getMessageBackupUploadForm(backupKey: BackupKey, aci: ACI, serviceCredential: ArchiveServiceCredential): NetworkResult { return NetworkResult.fromFetch { @@ -200,6 +206,7 @@ class ArchiveApi(private val pushServiceSocket: PushServiceSocket) { * 400: Bad arguments, or made on an authenticated channel * 401: Invalid presentation or signature * 403: Insufficient permissions + * 410: The source object was not found * 413: No media space remaining * 429: Rate-limited */ diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/crypto/AttachmentCipherStreamUtil.kt b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/crypto/AttachmentCipherStreamUtil.kt index bd4050f0b1..5dde4752a3 100644 --- a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/crypto/AttachmentCipherStreamUtil.kt +++ b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/crypto/AttachmentCipherStreamUtil.kt @@ -5,16 +5,22 @@ package org.whispersystems.signalservice.api.crypto -class AttachmentCipherStreamUtil { - companion object { - @JvmStatic - fun getCiphertextLength(plaintextLength: Long): Long { - return 16 + (plaintextLength / 16 + 1) * 16 + 32 - } +object AttachmentCipherStreamUtil { - @JvmStatic - fun getPlaintextLength(ciphertextLength: Long): Long { - return ((ciphertextLength - 16 - 32) / 16 - 1) * 16 - } + /** + * Given the size of the plaintext, this will return the length of ciphertext output. + * @param inputSize Size of the plaintext fed into the stream. This does *not* automatically include padding. Add that yourself before calling if needed. + */ + @JvmStatic + fun getCiphertextLength(plaintextLength: Long): Long { + val ivLength: Long = 16 + val macLength: Long = 32 + val blockLength: Long = (plaintextLength / 16 + 1) * 16 + return ivLength + macLength + blockLength + } + + @JvmStatic + fun getPlaintextLength(ciphertextLength: Long): Long { + return ((ciphertextLength - 16 - 32) / 16 - 1) * 16 } } diff --git a/libsignal-service/src/test/java/org/whispersystems/signalservice/api/crypto/AttachmentCipherStreamUtilTest.kt b/libsignal-service/src/test/java/org/whispersystems/signalservice/api/crypto/AttachmentCipherStreamUtilTest.kt new file mode 100644 index 0000000000..8368824375 --- /dev/null +++ b/libsignal-service/src/test/java/org/whispersystems/signalservice/api/crypto/AttachmentCipherStreamUtilTest.kt @@ -0,0 +1,33 @@ +/* + * Copyright 2024 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.signalservice.api.crypto + +import org.junit.Assert.assertEquals +import org.junit.Test +import org.signal.core.util.copyTo +import org.whispersystems.signalservice.internal.util.Util +import java.io.ByteArrayOutputStream + +class AttachmentCipherStreamUtilTest { + + @Test + fun `getCiphertextLength should return the correct length`() { + for (length in 0..1024) { + val plaintext = ByteArray(length).also { it.fill(0x42) } + val key = Util.getSecretBytes(64) + val iv = Util.getSecretBytes(16) + + val outputStream = ByteArrayOutputStream() + val cipherStream = AttachmentCipherOutputStream(key, iv, outputStream) + plaintext.inputStream().copyTo(cipherStream) + + val expected = AttachmentCipherStreamUtil.getCiphertextLength(length.toLong()) + val actual = outputStream.size().toLong() + + assertEquals(expected, actual) + } + } +}