diff --git a/app/src/main/java/org/thoughtcrime/securesms/attachments/AttachmentUploadUtil.kt b/app/src/main/java/org/thoughtcrime/securesms/attachments/AttachmentUploadUtil.kt new file mode 100644 index 0000000000..3423a8150a --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/attachments/AttachmentUploadUtil.kt @@ -0,0 +1,105 @@ +/* + * Copyright 2024 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.thoughtcrime.securesms.attachments + +import android.content.Context +import android.graphics.Bitmap +import android.os.Build +import org.signal.core.util.logging.Log +import org.signal.protos.resumableuploads.ResumableUpload +import org.thoughtcrime.securesms.blurhash.BlurHashEncoder +import org.thoughtcrime.securesms.mms.PartAuthority +import org.thoughtcrime.securesms.util.MediaUtil +import org.whispersystems.signalservice.api.messages.SignalServiceAttachment +import org.whispersystems.signalservice.api.messages.SignalServiceAttachment.ProgressListener +import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentStream +import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec +import java.io.IOException +import java.util.Objects + +/** + * A place collect common attachment upload operations to allow for code reuse. + */ +object AttachmentUploadUtil { + + private val TAG = Log.tag(AttachmentUploadUtil::class.java) + + /** + * Builds a [SignalServiceAttachmentStream] from the provided data, which can then be provided to various upload methods. + */ + @Throws(IOException::class) + fun buildSignalServiceAttachmentStream( + context: Context, + attachment: Attachment, + uploadSpec: ResumableUpload, + cancellationSignal: (() -> Boolean)? = null, + progressListener: ProgressListener? = null + ): SignalServiceAttachmentStream { + val inputStream = PartAuthority.getAttachmentStream(context, attachment.uri!!) + val builder = SignalServiceAttachment.newStreamBuilder() + .withStream(inputStream) + .withContentType(attachment.contentType) + .withLength(attachment.size) + .withFileName(attachment.fileName) + .withVoiceNote(attachment.voiceNote) + .withBorderless(attachment.borderless) + .withGif(attachment.videoGif) + .withFaststart(attachment.transformProperties?.mp4FastStart ?: false) + .withWidth(attachment.width) + .withHeight(attachment.height) + .withUploadTimestamp(System.currentTimeMillis()) + .withCaption(attachment.caption) + .withResumableUploadSpec(ResumableUploadSpec.from(uploadSpec)) + .withCancelationSignal(cancellationSignal) + .withListener(progressListener) + + if (MediaUtil.isImageType(attachment.contentType)) { + builder.withBlurHash(getImageBlurHash(context, attachment)) + } else if (MediaUtil.isVideoType(attachment.contentType)) { + builder.withBlurHash(getVideoBlurHash(context, attachment)) + } + + return builder.build() + } + + @Throws(IOException::class) + private fun getImageBlurHash(context: Context, attachment: Attachment): String? { + if (attachment.blurHash != null) { + return attachment.blurHash!!.hash + } + + if (attachment.uri == null) { + return null + } + + return PartAuthority.getAttachmentStream(context, attachment.uri!!).use { inputStream -> + BlurHashEncoder.encode(inputStream) + } + } + + @Throws(IOException::class) + private fun getVideoBlurHash(context: Context, attachment: Attachment): String? { + if (attachment.blurHash != null) { + return attachment.blurHash.hash + } + + if (Build.VERSION.SDK_INT < 23) { + Log.w(TAG, "Video thumbnails not supported...") + return null + } + + return MediaUtil.getVideoThumbnail(context, Objects.requireNonNull(attachment.uri), 1000)?.let { bitmap -> + val thumb = Bitmap.createScaledBitmap(bitmap, 100, 100, false) + bitmap.recycle() + + Log.i(TAG, "Generated video thumbnail...") + val hash = BlurHashEncoder.encode(thumb) + thumb.recycle() + + hash + } + } +} diff --git a/app/src/main/java/org/thoughtcrime/securesms/backup/v2/BackupRepository.kt b/app/src/main/java/org/thoughtcrime/securesms/backup/v2/BackupRepository.kt index 52cc26740f..8a1377abc8 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/backup/v2/BackupRepository.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/backup/v2/BackupRepository.kt @@ -346,6 +346,10 @@ object BackupRepository { return api .triggerBackupIdReservation(backupKey) .then { getAuthCredential() } + .then { credential -> + api.setPublicKey(backupKey, credential) + .map { credential } + } .then { credential -> val mediaName = attachment.getMediaName() val request = attachment.toArchiveMediaRequest(mediaName, backupKey) diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt b/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt index e10954a671..f10387e5f5 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt @@ -98,7 +98,6 @@ import java.security.NoSuchAlgorithmException import java.util.LinkedList import java.util.Optional import java.util.UUID -import kotlin.time.Duration.Companion.days class AttachmentTable( context: Context, @@ -147,6 +146,7 @@ class AttachmentTable( const val ARCHIVE_MEDIA_NAME = "archive_media_name" const val ARCHIVE_MEDIA_ID = "archive_media_id" const val ARCHIVE_TRANSFER_FILE = "archive_transfer_file" + const val ARCHIVE_TRANSFER_STATE = "archive_transfer_state" const val ATTACHMENT_JSON_ALIAS = "attachment_json" @@ -201,7 +201,8 @@ class AttachmentTable( ARCHIVE_TRANSFER_FILE ) - const val CREATE_TABLE = """ + @JvmField + val CREATE_TABLE = """ CREATE TABLE $TABLE_NAME ( $ID INTEGER PRIMARY KEY AUTOINCREMENT, $MESSAGE_ID INTEGER, @@ -239,7 +240,8 @@ class AttachmentTable( $ARCHIVE_CDN INTEGER DEFAULT 0, $ARCHIVE_MEDIA_NAME TEXT DEFAULT NULL, $ARCHIVE_MEDIA_ID TEXT DEFAULT NULL, - $ARCHIVE_TRANSFER_FILE TEXT DEFAULT NULL + $ARCHIVE_TRANSFER_FILE TEXT DEFAULT NULL, + $ARCHIVE_TRANSFER_STATE INTEGER DEFAULT ${ArchiveTransferState.NONE.value} ) """ @@ -254,8 +256,6 @@ class AttachmentTable( "CREATE INDEX IF NOT EXISTS attachment_archive_media_id_index ON $TABLE_NAME ($ARCHIVE_MEDIA_ID);" ) - val ATTACHMENT_POINTER_REUSE_THRESHOLD = 7.days.inWholeMilliseconds - @JvmStatic @Throws(IOException::class) fun newDataFile(context: Context): File { @@ -426,6 +426,78 @@ class AttachmentTable( }.flatten() } + /** + * Finds the next eligible attachment that needs to be uploaded to the archive service. + * If it exists, it'll also atomically be marked as [ArchiveTransferState.BACKFILL_UPLOAD_IN_PROGRESS]. + */ + fun getNextAttachmentToArchiveAndMarkUploadInProgress(): DatabaseAttachment? { + return writableDatabase.withinTransaction { + val record: DatabaseAttachment? = readableDatabase + .select(*PROJECTION) + .from(TABLE_NAME) + .where("$ARCHIVE_TRANSFER_STATE = ? AND $DATA_FILE NOT NULL AND $TRANSFER_STATE = $TRANSFER_PROGRESS_DONE", ArchiveTransferState.NONE.value) + .orderBy("$ID DESC") + .limit(1) + .run() + .readToSingleObject { it.readAttachment() } + + if (record != null) { + writableDatabase + .update(TABLE_NAME) + .values(ARCHIVE_TRANSFER_STATE to ArchiveTransferState.BACKFILL_UPLOAD_IN_PROGRESS.value) + .where("$ID = ?", record.attachmentId) + .run() + } + + record + } + } + + /** + * Returns the current archive transfer state, if the attachment can be found. + */ + fun getArchiveTransferState(id: AttachmentId): ArchiveTransferState? { + return readableDatabase + .select(ARCHIVE_TRANSFER_STATE) + .from(TABLE_NAME) + .where("$ID = ?", id.id) + .run() + .readToSingleObject { ArchiveTransferState.deserialize(it.requireInt(ARCHIVE_TRANSFER_STATE)) } + } + + /** + * Sets the archive transfer state for the given attachment and all other attachments that share the same data file. + */ + fun setArchiveTransferState(id: AttachmentId, state: ArchiveTransferState) { + writableDatabase.withinTransaction { + val dataFile: String = readableDatabase + .select(DATA_FILE) + .from(TABLE_NAME) + .where("$ID = ?", id.id) + .run() + .readToSingleObject { it.requireString(DATA_FILE) } ?: return@withinTransaction + + writableDatabase + .update(TABLE_NAME) + .values(ARCHIVE_TRANSFER_STATE to state.value) + .where("$DATA_FILE = ?", dataFile) + .run() + } + } + + /** + * Resets any in-progress archive backfill states to [ArchiveTransferState.NONE], returning the number that had to be reset. + * This should only be called if you believe the backfill process has finished. In this case, if this returns a value > 0, + * it indicates that state was mis-tracked and you should try uploading again. + */ + fun resetPendingArchiveBackfills(): Int { + return writableDatabase + .update(TABLE_NAME) + .values(ARCHIVE_TRANSFER_STATE to ArchiveTransferState.NONE.value) + .where("$ARCHIVE_TRANSFER_STATE == ${ArchiveTransferState.BACKFILL_UPLOAD_IN_PROGRESS.value} || $ARCHIVE_TRANSFER_STATE == ${ArchiveTransferState.BACKFILL_UPLOADED.value}") + .run() + } + fun deleteAttachmentsForMessage(mmsId: Long): Boolean { Log.d(TAG, "[deleteAttachmentsForMessage] mmsId: $mmsId") @@ -1992,4 +2064,44 @@ class AttachmentTable( } } } + + /** + * This maintains two different state paths for uploading attachments to the archive. + * + * The first is the backfill process, which will happen after newly-enabling backups. That process will go: + * 1. [NONE] + * 2. [BACKFILL_UPLOAD_IN_PROGRESS] + * 3. [BACKFILL_UPLOADED] + * 4. [FINISHED] or [PERMANENT_FAILURE] + * + * The second is when newly sending/receiving an attachment after enabling backups. That process will go: + * 1. [NONE] + * 2. [ATTACHMENT_TRANSFER_PENDING] + * 3. [FINISHED] or [PERMANENT_FAILURE] + */ + enum class ArchiveTransferState(val value: Int) { + /** Not backed up at all. */ + NONE(0), + + /** The upload to the attachment service is in progress. */ + BACKFILL_UPLOAD_IN_PROGRESS(1), + + /** Successfully uploaded to the attachment service during the backfill process. Still need to tell the service to move the file over to the archive service. */ + BACKFILL_UPLOADED(2), + + /** Completely finished backing up the attachment. */ + FINISHED(3), + + /** It is impossible to upload this attachment. */ + PERMANENT_FAILURE(4), + + /** We sent/received this attachment after enabling backups, but still need to transfer the file to the archive service. */ + ATTACHMENT_TRANSFER_PENDING(5); + + companion object { + fun deserialize(value: Int): ArchiveTransferState { + return values().firstOrNull { it.value == value } ?: NONE + } + } + } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/helpers/SignalDatabaseMigrations.kt b/app/src/main/java/org/thoughtcrime/securesms/database/helpers/SignalDatabaseMigrations.kt index 9cffce8811..17dabc9d4d 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/helpers/SignalDatabaseMigrations.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/helpers/SignalDatabaseMigrations.kt @@ -84,6 +84,7 @@ import org.thoughtcrime.securesms.database.helpers.migration.V223_AddNicknameAnd import org.thoughtcrime.securesms.database.helpers.migration.V224_AddAttachmentArchiveColumns import org.thoughtcrime.securesms.database.helpers.migration.V225_AddLocalUserJoinedStateAndGroupCallActiveState import org.thoughtcrime.securesms.database.helpers.migration.V226_AddAttachmentMediaIdIndex +import org.thoughtcrime.securesms.database.helpers.migration.V227_AddAttachmentArchiveTransferState /** * Contains all of the database migrations for [SignalDatabase]. Broken into a separate file for cleanliness. @@ -170,10 +171,11 @@ object SignalDatabaseMigrations { 223 to V223_AddNicknameAndNoteFieldsToRecipientTable, 224 to V224_AddAttachmentArchiveColumns, 225 to V225_AddLocalUserJoinedStateAndGroupCallActiveState, - 226 to V226_AddAttachmentMediaIdIndex + 226 to V226_AddAttachmentMediaIdIndex, + 227 to V227_AddAttachmentArchiveTransferState ) - const val DATABASE_VERSION = 226 + const val DATABASE_VERSION = 227 @JvmStatic fun migrate(context: Application, db: SQLiteDatabase, oldVersion: Int, newVersion: Int) { diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/helpers/migration/V227_AddAttachmentArchiveTransferState.kt b/app/src/main/java/org/thoughtcrime/securesms/database/helpers/migration/V227_AddAttachmentArchiveTransferState.kt new file mode 100644 index 0000000000..db664232e0 --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/database/helpers/migration/V227_AddAttachmentArchiveTransferState.kt @@ -0,0 +1,19 @@ +/* + * Copyright 2024 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.thoughtcrime.securesms.database.helpers.migration + +import android.app.Application +import net.zetetic.database.sqlcipher.SQLiteDatabase + +/** + * Adds a new column to track the status of transferring attachments to the archive service. + */ +object V227_AddAttachmentArchiveTransferState : SignalDatabaseMigration { + + override fun migrate(context: Application, db: SQLiteDatabase, oldVersion: Int, newVersion: Int) { + db.execSQL("ALTER TABLE attachment ADD COLUMN archive_transfer_state INTEGER DEFAULT 0") + } +} diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobExtensions.kt b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobExtensions.kt deleted file mode 100644 index 525b7b4e9b..0000000000 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobExtensions.kt +++ /dev/null @@ -1,14 +0,0 @@ -/* - * Copyright 2024 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ - -package org.thoughtcrime.securesms.jobmanager - -import org.thoughtcrime.securesms.jobmanager.impl.BackoffUtil -import org.thoughtcrime.securesms.util.FeatureFlags - -/** - * Helper to calculate the default backoff interval for a [Job] given it's run attempt count. - */ -fun Job.defaultBackoffInterval(): Long = BackoffUtil.exponentialBackoff(runAttempt + 1, FeatureFlags.getDefaultMaxBackoff()) diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveAttachmentBackfillJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveAttachmentBackfillJob.kt new file mode 100644 index 0000000000..71436b32f6 --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveAttachmentBackfillJob.kt @@ -0,0 +1,226 @@ +/* + * Copyright 2024 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.thoughtcrime.securesms.jobs + +import org.signal.core.util.logging.Log +import org.signal.protos.resumableuploads.ResumableUpload +import org.thoughtcrime.securesms.attachments.Attachment +import org.thoughtcrime.securesms.attachments.AttachmentId +import org.thoughtcrime.securesms.attachments.AttachmentUploadUtil +import org.thoughtcrime.securesms.attachments.DatabaseAttachment +import org.thoughtcrime.securesms.attachments.PointerAttachment +import org.thoughtcrime.securesms.backup.v2.BackupRepository +import org.thoughtcrime.securesms.database.AttachmentTable +import org.thoughtcrime.securesms.database.SignalDatabase +import org.thoughtcrime.securesms.dependencies.ApplicationDependencies +import org.thoughtcrime.securesms.jobmanager.Job +import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint +import org.thoughtcrime.securesms.jobs.protos.ArchiveAttachmentBackfillJobData +import org.whispersystems.signalservice.api.NetworkResult +import org.whispersystems.signalservice.api.archive.ArchiveMediaResponse +import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentPointer +import java.io.IOException +import java.util.Optional +import kotlin.time.Duration.Companion.days + +/** + * When run, this will find the next attachment that needs to be uploaded to the archive service and upload it. + * It will enqueue a copy of itself if it thinks there is more work to be done, and that copy will continue the upload process. + */ +class ArchiveAttachmentBackfillJob private constructor( + parameters: Parameters, + private var attachmentId: AttachmentId?, + private var uploadSpec: ResumableUpload? +) : Job(parameters) { + companion object { + private val TAG = Log.tag(ArchiveAttachmentBackfillJob::class.java) + + const val KEY = "ArchiveAttachmentBackfillJob" + } + + constructor() : this( + parameters = Parameters.Builder() + .setQueue("ArchiveAttachmentBackfillJob") + .setMaxInstancesForQueue(2) + .setLifespan(30.days.inWholeMilliseconds) + .setMaxAttempts(Parameters.UNLIMITED) + .addConstraint(NetworkConstraint.KEY) + .build(), + attachmentId = null, + uploadSpec = null + ) + + override fun serialize(): ByteArray { + return ArchiveAttachmentBackfillJobData( + attachmentId = attachmentId?.id, + uploadSpec = uploadSpec + ).encode() + } + + override fun getFactoryKey(): String = KEY + + override fun run(): Result { + var attachmentRecord: DatabaseAttachment? = if (attachmentId != null) { + Log.i(TAG, "Retrying $attachmentId") + SignalDatabase.attachments.getAttachment(attachmentId!!) + } else { + SignalDatabase.attachments.getNextAttachmentToArchiveAndMarkUploadInProgress() + } + + if (attachmentRecord == null && attachmentId != null) { + Log.w(TAG, "Attachment $attachmentId was not found! Was likely deleted during the process of archiving. Re-enqueuing job with no ID.") + ApplicationDependencies.getJobManager().add(ArchiveAttachmentBackfillJob()) + return Result.success() + } + + // TODO [backup] If we ever wanted to allow multiple instances of this job to run in parallel, this would have to be done somewhere else + if (attachmentRecord == null) { + Log.i(TAG, "No more attachments to backfill! Ensuring there's no dangling state.") + + val resetCount = SignalDatabase.attachments.resetPendingArchiveBackfills() + if (resetCount > 0) { + Log.w(TAG, "We thought we were done, but $resetCount items were still in progress! Need to run again to retry.") + ApplicationDependencies.getJobManager().add(ArchiveAttachmentBackfillJob()) + } else { + Log.i(TAG, "All good! Should be done.") + } + + return Result.success() + } + + attachmentId = attachmentRecord.attachmentId + + val transferState: AttachmentTable.ArchiveTransferState? = SignalDatabase.attachments.getArchiveTransferState(attachmentRecord.attachmentId) + if (transferState == null) { + Log.w(TAG, "Attachment $attachmentId was not found when looking for the transfer state! Was likely just deleted. Re-enqueuing job with no ID.") + ApplicationDependencies.getJobManager().add(ArchiveAttachmentBackfillJob()) + return Result.success() + } + + Log.i(TAG, "Current state: $transferState") + + if (transferState == AttachmentTable.ArchiveTransferState.FINISHED) { + Log.i(TAG, "Attachment $attachmentId is already finished. Skipping.") + ApplicationDependencies.getJobManager().add(ArchiveAttachmentBackfillJob()) + return Result.success() + } + + if (transferState == AttachmentTable.ArchiveTransferState.PERMANENT_FAILURE) { + Log.i(TAG, "Attachment $attachmentId is already marked as a permanent failure. Skipping.") + ApplicationDependencies.getJobManager().add(ArchiveAttachmentBackfillJob()) + return Result.success() + } + + if (transferState == AttachmentTable.ArchiveTransferState.ATTACHMENT_TRANSFER_PENDING) { + Log.i(TAG, "Attachment $attachmentId is already marked as pending transfer, meaning it's a send attachment that will be uploaded on it's own. Skipping.") + ApplicationDependencies.getJobManager().add(ArchiveAttachmentBackfillJob()) + return Result.success() + } + + if (transferState == AttachmentTable.ArchiveTransferState.BACKFILL_UPLOAD_IN_PROGRESS) { + if (uploadSpec == null || System.currentTimeMillis() > uploadSpec!!.timeout) { + Log.d(TAG, "Need an upload spec. Fetching...") + uploadSpec = ApplicationDependencies.getSignalServiceMessageSender().getResumableUploadSpec().toProto() + } else { + Log.d(TAG, "Already have an upload spec. Continuing...") + } + + val attachmentStream = try { + AttachmentUploadUtil.buildSignalServiceAttachmentStream( + context = context, + attachment = attachmentRecord, + uploadSpec = uploadSpec!!, + cancellationSignal = { this.isCanceled } + ) + } catch (e: IOException) { + Log.e(TAG, "Failed to get attachment stream for $attachmentId", e) + return Result.retry(defaultBackoff()) + } + + Log.d(TAG, "Beginning upload...") + val remoteAttachment: SignalServiceAttachmentPointer = try { + ApplicationDependencies.getSignalServiceMessageSender().uploadAttachment(attachmentStream) + } catch (e: IOException) { + Log.w(TAG, "Failed to upload $attachmentId", e) + return Result.retry(defaultBackoff()) + } + Log.d(TAG, "Upload complete!") + + val pointerAttachment: Attachment = PointerAttachment.forPointer(Optional.of(remoteAttachment), null, attachmentRecord.fastPreflightId).get() + SignalDatabase.attachments.finalizeAttachmentAfterUpload(attachmentRecord.attachmentId, pointerAttachment, remoteAttachment.uploadTimestamp) + SignalDatabase.attachments.setArchiveTransferState(attachmentRecord.attachmentId, AttachmentTable.ArchiveTransferState.BACKFILL_UPLOADED) + + attachmentRecord = SignalDatabase.attachments.getAttachment(attachmentRecord.attachmentId) + } + + if (attachmentRecord == null) { + Log.w(TAG, "$attachmentId was not found after uploading! Possibly deleted in a narrow race condition. Re-enqueuing job with no ID.") + ApplicationDependencies.getJobManager().add(ArchiveAttachmentBackfillJob()) + return Result.success() + } + + Log.d(TAG, "Moving attachment to archive...") + return when (val result = BackupRepository.archiveMedia(attachmentRecord)) { + is NetworkResult.Success -> { + Log.d(TAG, "Move complete!") + + SignalDatabase.attachments.setArchiveTransferState(attachmentRecord.attachmentId, AttachmentTable.ArchiveTransferState.FINISHED) + ApplicationDependencies.getJobManager().add(ArchiveAttachmentBackfillJob()) + Result.success() + } + + is NetworkResult.ApplicationError -> { + Log.w(TAG, "Failed to archive ${attachmentRecord.attachmentId} due to an application error. Retrying.", result.throwable) + Result.retry(defaultBackoff()) + } + + is NetworkResult.NetworkError -> { + Log.w(TAG, "Encountered a transient network error. Retrying.") + Result.retry(defaultBackoff()) + } + + is NetworkResult.StatusCodeError -> { + Log.w(TAG, "Failed request with status code ${result.code} for ${attachmentRecord.attachmentId}") + + when (ArchiveMediaResponse.StatusCodes.from(result.code)) { + ArchiveMediaResponse.StatusCodes.BadArguments, + ArchiveMediaResponse.StatusCodes.InvalidPresentationOrSignature, + ArchiveMediaResponse.StatusCodes.InsufficientPermissions, + ArchiveMediaResponse.StatusCodes.RateLimited -> { + Result.retry(defaultBackoff()) + } + + ArchiveMediaResponse.StatusCodes.NoMediaSpaceRemaining -> { + // TODO [backup] This will end the process right away. We need to integrate this with client-driven retry UX. + Result.failure() + } + + ArchiveMediaResponse.StatusCodes.Unknown -> { + Result.retry(defaultBackoff()) + } + } + } + } + } + + override fun onFailure() { + attachmentId?.let { id -> + Log.w(TAG, "Failed to archive $id!") + } + } + + class Factory : Job.Factory { + override fun create(parameters: Parameters, serializedData: ByteArray?): ArchiveAttachmentBackfillJob { + val data = serializedData?.let { ArchiveAttachmentBackfillJobData.ADAPTER.decode(it) } + + return ArchiveAttachmentBackfillJob( + parameters = parameters, + attachmentId = data?.attachmentId?.let { AttachmentId(it) }, + uploadSpec = data?.uploadSpec + ) + } + } +} diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentHashBackfillJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentHashBackfillJob.kt index a4178f470c..be32017fe2 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentHashBackfillJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentHashBackfillJob.kt @@ -12,7 +12,6 @@ import org.thoughtcrime.securesms.attachments.AttachmentId import org.thoughtcrime.securesms.database.SignalDatabase import org.thoughtcrime.securesms.dependencies.ApplicationDependencies import org.thoughtcrime.securesms.jobmanager.Job -import org.thoughtcrime.securesms.jobmanager.defaultBackoffInterval import java.io.File import java.io.FileNotFoundException import java.io.IOException @@ -84,7 +83,7 @@ class AttachmentHashBackfillJob private constructor(parameters: Parameters) : Jo Log.w(TAG, "Underlying cause was a FileNotFoundException. Clearing all usages.", true) SignalDatabase.attachments.clearUsagesOfDataFile(file) } else { - return Result.retry(defaultBackoffInterval()) + return Result.retry(defaultBackoff()) } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentUploadJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentUploadJob.kt index e8af9f7247..c6097ef5fe 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentUploadJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentUploadJob.kt @@ -4,8 +4,6 @@ */ package org.thoughtcrime.securesms.jobs -import android.graphics.Bitmap -import android.os.Build import android.text.TextUtils import org.greenrobot.eventbus.EventBus import org.signal.core.util.inRoundedDays @@ -15,8 +13,8 @@ import org.signal.protos.resumableuploads.ResumableUpload import org.thoughtcrime.securesms.R import org.thoughtcrime.securesms.attachments.Attachment import org.thoughtcrime.securesms.attachments.AttachmentId +import org.thoughtcrime.securesms.attachments.AttachmentUploadUtil import org.thoughtcrime.securesms.attachments.PointerAttachment -import org.thoughtcrime.securesms.blurhash.BlurHashEncoder import org.thoughtcrime.securesms.database.AttachmentTable import org.thoughtcrime.securesms.database.SignalDatabase import org.thoughtcrime.securesms.dependencies.ApplicationDependencies @@ -26,20 +24,16 @@ import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec import org.thoughtcrime.securesms.jobs.protos.AttachmentUploadJobData import org.thoughtcrime.securesms.mms.MmsException -import org.thoughtcrime.securesms.mms.PartAuthority import org.thoughtcrime.securesms.net.NotPushRegisteredException import org.thoughtcrime.securesms.recipients.Recipient import org.thoughtcrime.securesms.service.AttachmentProgressService import org.thoughtcrime.securesms.util.FeatureFlags -import org.thoughtcrime.securesms.util.MediaUtil import org.whispersystems.signalservice.api.crypto.AttachmentCipherStreamUtil import org.whispersystems.signalservice.api.messages.SignalServiceAttachment import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentStream import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulResumableUploadResponseCodeException import org.whispersystems.signalservice.internal.crypto.PaddingInputStream -import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec import java.io.IOException -import java.util.Objects import java.util.Optional import java.util.concurrent.TimeUnit import kotlin.time.Duration.Companion.days @@ -210,23 +204,12 @@ class AttachmentUploadJob private constructor( } return try { - val inputStream = PartAuthority.getAttachmentStream(context, attachment.uri!!) - val builder = SignalServiceAttachment.newStreamBuilder() - .withStream(inputStream) - .withContentType(attachment.contentType) - .withLength(attachment.size) - .withFileName(attachment.fileName) - .withVoiceNote(attachment.voiceNote) - .withBorderless(attachment.borderless) - .withGif(attachment.videoGif) - .withFaststart(attachment.transformProperties?.mp4FastStart ?: false) - .withWidth(attachment.width) - .withHeight(attachment.height) - .withUploadTimestamp(System.currentTimeMillis()) - .withCaption(attachment.caption) - .withResumableUploadSpec(ResumableUploadSpec.from(resumableUploadSpec)) - .withCancelationSignal { this.isCanceled } - .withListener(object : SignalServiceAttachment.ProgressListener { + AttachmentUploadUtil.buildSignalServiceAttachmentStream( + context = context, + attachment = attachment, + uploadSpec = resumableUploadSpec, + cancellationSignal = { isCanceled }, + progressListener = object : SignalServiceAttachment.ProgressListener { override fun onAttachmentProgress(total: Long, progress: Long) { EventBus.getDefault().postSticky(PartProgressEvent(attachment, PartProgressEvent.Type.NETWORK, total, progress)) notification?.progress = (progress.toFloat() / total) @@ -235,58 +218,13 @@ class AttachmentUploadJob private constructor( override fun shouldCancel(): Boolean { return isCanceled } - }) - - if (MediaUtil.isImageType(attachment.contentType)) { - builder.withBlurHash(getImageBlurHash(attachment)).build() - } else if (MediaUtil.isVideoType(attachment.contentType)) { - builder.withBlurHash(getVideoBlurHash(attachment)).build() - } else { - builder.build() - } + } + ) } catch (e: IOException) { throw InvalidAttachmentException(e) } } - @Throws(IOException::class) - private fun getImageBlurHash(attachment: Attachment): String? { - if (attachment.blurHash != null) { - return attachment.blurHash!!.hash - } - - if (attachment.uri == null) { - return null - } - - return PartAuthority.getAttachmentStream(context, attachment.uri!!).use { inputStream -> - BlurHashEncoder.encode(inputStream) - } - } - - @Throws(IOException::class) - private fun getVideoBlurHash(attachment: Attachment): String? { - if (attachment.blurHash != null) { - return attachment.blurHash!!.hash - } - - if (Build.VERSION.SDK_INT < 23) { - Log.w(TAG, "Video thumbnails not supported...") - return null - } - - return MediaUtil.getVideoThumbnail(context, Objects.requireNonNull(attachment.uri), 1000)?.let { bitmap -> - val thumb = Bitmap.createScaledBitmap(bitmap, 100, 100, false) - bitmap.recycle() - - Log.i(TAG, "Generated video thumbnail...") - val hash = BlurHashEncoder.encode(thumb) - thumb.recycle() - - hash - } - } - private inner class InvalidAttachmentException : Exception { constructor(message: String?) : super(message) constructor(e: Exception?) : super(e) diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java index 01553fac8f..d79a476d69 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java @@ -101,6 +101,7 @@ public final class JobManagerFactories { put(AccountConsistencyWorkerJob.KEY, new AccountConsistencyWorkerJob.Factory()); put(AnalyzeDatabaseJob.KEY, new AnalyzeDatabaseJob.Factory()); put(ArchiveAttachmentJob.KEY, new ArchiveAttachmentJob.Factory()); + put(ArchiveAttachmentBackfillJob.KEY, new ArchiveAttachmentBackfillJob.Factory()); put(AttachmentCompressionJob.KEY, new AttachmentCompressionJob.Factory()); put(AttachmentCopyJob.KEY, new AttachmentCopyJob.Factory()); put(AttachmentDownloadJob.KEY, new AttachmentDownloadJob.Factory()); diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/PreKeysSyncJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/PreKeysSyncJob.kt index 35a6e7cf0f..fb79eda0d5 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/PreKeysSyncJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/PreKeysSyncJob.kt @@ -269,7 +269,7 @@ class PreKeysSyncJob private constructor( return when (result) { is NetworkResult.Success -> true - is NetworkResult.NetworkError -> throw result.throwable ?: PushNetworkException("Network error") + is NetworkResult.NetworkError -> throw result.exception ?: PushNetworkException("Network error") is NetworkResult.ApplicationError -> throw result.throwable is NetworkResult.StatusCodeError -> if (result.code == 409) { false diff --git a/app/src/main/protowire/JobData.proto b/app/src/main/protowire/JobData.proto index 94b1a1dfd1..29964bc2dc 100644 --- a/app/src/main/protowire/JobData.proto +++ b/app/src/main/protowire/JobData.proto @@ -50,4 +50,9 @@ message PreKeysSyncJobData { message ArchiveAttachmentJobData { uint64 attachmentId = 1; +} + +message ArchiveAttachmentBackfillJobData { + optional uint64 attachmentId = 1; + ResumableUpload uploadSpec = 2; } \ No newline at end of file diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/NetworkResult.kt b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/NetworkResult.kt index 12981b17df..3d741d9295 100644 --- a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/NetworkResult.kt +++ b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/NetworkResult.kt @@ -6,7 +6,6 @@ package org.whispersystems.signalservice.api import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulResponseCodeException -import org.whispersystems.signalservice.api.push.exceptions.PushNetworkException import java.io.IOException /** @@ -33,7 +32,7 @@ sealed class NetworkResult { fun fromFetch(fetch: () -> T): NetworkResult = try { Success(fetch()) } catch (e: NonSuccessfulResponseCodeException) { - StatusCodeError(e.code, e) + StatusCodeError(e.code, e.body, e) } catch (e: IOException) { NetworkError(e) } catch (e: Throwable) { @@ -45,10 +44,10 @@ sealed class NetworkResult { data class Success(val result: T) : NetworkResult() /** Indicates a generic network error occurred before we were able to process a response. */ - data class NetworkError(val throwable: Throwable? = null) : NetworkResult() + data class NetworkError(val exception: IOException) : NetworkResult() /** Indicates we got a response, but it was a non-2xx response. */ - data class StatusCodeError(val code: Int, val throwable: Throwable? = null) : NetworkResult() + data class StatusCodeError(val code: Int, val body: String?, val exception: IOException) : NetworkResult() /** Indicates that the application somehow failed in a way unrelated to network activity. Usually a runtime crash. */ data class ApplicationError(val throwable: Throwable) : NetworkResult() @@ -59,8 +58,8 @@ sealed class NetworkResult { fun successOrThrow(): T { when (this) { is Success -> return result - is NetworkError -> throw throwable ?: PushNetworkException("Network error") - is StatusCodeError -> throw throwable ?: NonSuccessfulResponseCodeException(this.code) + is NetworkError -> throw exception + is StatusCodeError -> throw exception is ApplicationError -> throw throwable } } @@ -72,8 +71,8 @@ sealed class NetworkResult { fun map(transform: (T) -> R): NetworkResult { return when (this) { is Success -> Success(transform(this.result)) - is NetworkError -> NetworkError(throwable) - is StatusCodeError -> StatusCodeError(code, throwable) + is NetworkError -> NetworkError(exception) + is StatusCodeError -> StatusCodeError(code, body, exception) is ApplicationError -> ApplicationError(throwable) } } @@ -85,8 +84,8 @@ sealed class NetworkResult { fun then(result: (T) -> NetworkResult): NetworkResult { return when (this) { is Success -> result(this.result) - is NetworkError -> NetworkError(throwable) - is StatusCodeError -> StatusCodeError(code, throwable) + is NetworkError -> NetworkError(exception) + is StatusCodeError -> StatusCodeError(code, body, exception) is ApplicationError -> ApplicationError(throwable) } } diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/archive/ArchiveApi.kt b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/archive/ArchiveApi.kt index 04ac52f092..e66596915b 100644 --- a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/archive/ArchiveApi.kt +++ b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/archive/ArchiveApi.kt @@ -176,6 +176,13 @@ class ArchiveApi( /** * Copy and re-encrypt media from the attachments cdn into the backup cdn. + * + * Possible errors: + * 400: Bad arguments, or made on an authenticated channel + * 401: Invalid presentation or signature + * 403: Insufficient permissions + * 413: No media space remaining + * 429: Rate-limited */ fun archiveAttachmentMedia( backupKey: BackupKey, diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/archive/ArchiveMediaResponse.kt b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/archive/ArchiveMediaResponse.kt index 7cdfa650dd..9fe8ae4427 100644 --- a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/archive/ArchiveMediaResponse.kt +++ b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/archive/ArchiveMediaResponse.kt @@ -12,4 +12,19 @@ import com.fasterxml.jackson.annotation.JsonProperty */ class ArchiveMediaResponse( @JsonProperty val cdn: Int -) +) { + enum class StatusCodes(val code: Int) { + BadArguments(400), + InvalidPresentationOrSignature(401), + InsufficientPermissions(403), + NoMediaSpaceRemaining(413), + RateLimited(429), + Unknown(-1); + + companion object { + fun from(code: Int): StatusCodes { + return values().firstOrNull { it.code == code } ?: Unknown + } + } + } +} diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/push/exceptions/NonSuccessfulResponseCodeException.java b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/push/exceptions/NonSuccessfulResponseCodeException.java index df4e20cee1..c6c7090f60 100644 --- a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/push/exceptions/NonSuccessfulResponseCodeException.java +++ b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/push/exceptions/NonSuccessfulResponseCodeException.java @@ -13,16 +13,25 @@ import java.io.IOException; */ public class NonSuccessfulResponseCodeException extends IOException { - private final int code; + private final int code; + private final String body; public NonSuccessfulResponseCodeException(int code) { super("StatusCode: " + code); this.code = code; + this.body = null; } public NonSuccessfulResponseCodeException(int code, String s) { super("[" + code + "] " + s); this.code = code; + this.body = null; + } + + public NonSuccessfulResponseCodeException(int code, String s, String body) { + super("[" + code + "] " + s); + this.code = code; + this.body = body; } public int getCode() { @@ -36,4 +45,8 @@ public class NonSuccessfulResponseCodeException extends IOException { public boolean is5xx() { return code >= 500 && code < 600; } + + public String getBody() { + return body; + } } diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/push/PushServiceSocket.java b/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/push/PushServiceSocket.java index 1f274f8edf..c3425b118c 100644 --- a/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/push/PushServiceSocket.java +++ b/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/push/PushServiceSocket.java @@ -325,8 +325,9 @@ public class PushServiceSocket { private static final String CALL_LINK_CREATION_AUTH = "/v1/call-link/create-auth"; private static final String SERVER_DELIVERED_TIMESTAMP_HEADER = "X-Signal-Timestamp"; - private static final Map NO_HEADERS = Collections.emptyMap(); - private static final ResponseCodeHandler NO_HANDLER = new EmptyResponseCodeHandler(); + private static final Map NO_HEADERS = Collections.emptyMap(); + private static final ResponseCodeHandler NO_HANDLER = new EmptyResponseCodeHandler(); + private static final ResponseCodeHandler UNOPINIONATED_HANDER = new UnopinionatedResponseCodeHandler(); private static final long CDN2_RESUMABLE_LINK_LIFETIME_MILLIS = TimeUnit.DAYS.toMillis(7); @@ -494,14 +495,14 @@ public class PushServiceSocket { long secondsRoundedToNearestDay = TimeUnit.DAYS.toSeconds(TimeUnit.MILLISECONDS.toDays(currentTime)); long endTimeInSeconds = secondsRoundedToNearestDay + TimeUnit.DAYS.toSeconds(7); - String response = makeServiceRequest(String.format(Locale.US, ARCHIVE_CREDENTIALS, secondsRoundedToNearestDay, endTimeInSeconds), "GET", null); + String response = makeServiceRequest(String.format(Locale.US, ARCHIVE_CREDENTIALS, secondsRoundedToNearestDay, endTimeInSeconds), "GET", null, NO_HEADERS, UNOPINIONATED_HANDER, Optional.empty()); return JsonUtil.fromJson(response, ArchiveServiceCredentialsResponse.class); } public void setArchiveBackupId(BackupAuthCredentialRequest request) throws IOException { String body = JsonUtil.toJson(new ArchiveSetBackupIdRequest(request)); - makeServiceRequest(ARCHIVE_BACKUP_ID, "PUT", body); + makeServiceRequest(ARCHIVE_BACKUP_ID, "PUT", body, NO_HEADERS, UNOPINIONATED_HANDER, Optional.empty()); } public void setArchivePublicKey(ECPublicKey publicKey, ArchiveCredentialPresentation credentialPresentation) throws IOException { @@ -555,7 +556,7 @@ public class PushServiceSocket { public ArchiveMediaResponse archiveAttachmentMedia(@Nonnull ArchiveCredentialPresentation credentialPresentation, @Nonnull ArchiveMediaRequest request) throws IOException { Map headers = credentialPresentation.toHeaders(); - String response = makeServiceRequestWithoutAuthentication(ARCHIVE_MEDIA, "PUT", JsonUtil.toJson(request), headers, NO_HANDLER); + String response = makeServiceRequestWithoutAuthentication(ARCHIVE_MEDIA, "PUT", JsonUtil.toJson(request), headers, UNOPINIONATED_HANDER); return JsonUtil.fromJson(response, ArchiveMediaResponse.class); } @@ -566,7 +567,7 @@ public class PushServiceSocket { public BatchArchiveMediaResponse archiveAttachmentMedia(@Nonnull ArchiveCredentialPresentation credentialPresentation, @Nonnull BatchArchiveMediaRequest request) throws IOException { Map headers = credentialPresentation.toHeaders(); - String response = makeServiceRequestWithoutAuthentication(ARCHIVE_MEDIA_BATCH, "PUT", JsonUtil.toJson(request), headers, NO_HANDLER); + String response = makeServiceRequestWithoutAuthentication(ARCHIVE_MEDIA_BATCH, "PUT", JsonUtil.toJson(request), headers, UNOPINIONATED_HANDER); return JsonUtil.fromJson(response, BatchArchiveMediaResponse.class); } @@ -2660,6 +2661,28 @@ public class PushServiceSocket { public void handle(int responseCode, ResponseBody body) { } } + /** + * A {@link ResponseCodeHandler} that only throws {@link NonSuccessfulResponseCodeException} with the response body. + * Any further processing is left to the caller. + */ + private static class UnopinionatedResponseCodeHandler implements ResponseCodeHandler { + @Override + public void handle(int responseCode, ResponseBody body) throws NonSuccessfulResponseCodeException, PushNetworkException { + if (responseCode < 200 || responseCode > 299) { + String bodyString = null; + if (body != null) { + try { + bodyString = readBodyString(body); + } catch (MalformedResponseException e) { + Log.w(TAG, "Failed to read body string", e); + } + } + + throw new NonSuccessfulResponseCodeException(responseCode, "Response: " + responseCode, bodyString); + } + } + } + public enum ClientSet { KeyBackup } public CredentialResponse retrieveGroupsV2Credentials(long todaySeconds)