diff --git a/app/src/androidTest/java/org/thoughtcrime/securesms/database/BackupMediaSnapshotTableTest.kt b/app/src/androidTest/java/org/thoughtcrime/securesms/database/BackupMediaSnapshotTableTest.kt index 9ae96352ec..280d19079c 100644 --- a/app/src/androidTest/java/org/thoughtcrime/securesms/database/BackupMediaSnapshotTableTest.kt +++ b/app/src/androidTest/java/org/thoughtcrime/securesms/database/BackupMediaSnapshotTableTest.kt @@ -182,7 +182,7 @@ class BackupMediaSnapshotTableTest { val notFound = SignalDatabase.backupMediaSnapshots.getMediaObjectsThatCantBeFound(remoteData) assertThat(notFound.size).isEqualTo(1) - assertThat(notFound.first().cdn).isEqualTo(2) + assertThat(notFound.first()).isEqualTo(remoteData[1]) } private fun getTotalItemCount(): Int { diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt b/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt index c0b72ce0e7..d9f6c358b4 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt @@ -290,6 +290,10 @@ class AttachmentTable( "CREATE INDEX IF NOT EXISTS attachment_remote_digest_index ON $TABLE_NAME ($REMOTE_DIGEST);" ) + private val DATA_FILE_INFO_PROJECTION = arrayOf( + ID, DATA_FILE, DATA_SIZE, DATA_RANDOM, DATA_HASH_START, DATA_HASH_END, TRANSFORM_PROPERTIES, UPLOAD_TIMESTAMP, ARCHIVE_CDN, ARCHIVE_TRANSFER_STATE, THUMBNAIL_FILE, THUMBNAIL_RESTORE_STATE, THUMBNAIL_RANDOM + ) + @JvmStatic @Throws(IOException::class) fun newDataFile(context: Context): File { @@ -571,7 +575,7 @@ class AttachmentTable( return readableDatabase .select(ID) .from(TABLE_NAME) - .where("$ARCHIVE_TRANSFER_STATE = ? AND $DATA_FILE NOT NULL AND $TRANSFER_STATE = $TRANSFER_PROGRESS_DONE", ArchiveTransferState.NONE.value) + .where("($ARCHIVE_TRANSFER_STATE = ? or $ARCHIVE_TRANSFER_STATE = ?) AND $DATA_FILE NOT NULL AND $TRANSFER_STATE = $TRANSFER_PROGRESS_DONE", ArchiveTransferState.NONE.value, ArchiveTransferState.TEMPORARY_FAILURE.value) .orderBy("$ID DESC") .run() .readToList { AttachmentId(it.requireLong(ID)) } @@ -636,7 +640,7 @@ class AttachmentTable( fun doAnyAttachmentsNeedArchiveUpload(): Boolean { return readableDatabase .exists(TABLE_NAME) - .where("$ARCHIVE_TRANSFER_STATE = ? AND $DATA_FILE NOT NULL AND $TRANSFER_STATE = $TRANSFER_PROGRESS_DONE", ArchiveTransferState.NONE.value) + .where("($ARCHIVE_TRANSFER_STATE = ? OR $ARCHIVE_TRANSFER_STATE = ?) AND $DATA_FILE NOT NULL AND $TRANSFER_STATE = $TRANSFER_PROGRESS_DONE", ArchiveTransferState.NONE.value, ArchiveTransferState.TEMPORARY_FAILURE.value) .run() } @@ -672,6 +676,27 @@ class AttachmentTable( } } + /** + * Sets the archive transfer state for the given attachment and all other attachments that share the same data file iff + * the row isn't already marked as a [ArchiveTransferState.PERMANENT_FAILURE]. + */ + fun setArchiveTransferStateFailure(id: AttachmentId, state: ArchiveTransferState) { + writableDatabase.withinTransaction { + val dataFile: String = readableDatabase + .select(DATA_FILE) + .from(TABLE_NAME) + .where("$ID = ?", id.id) + .run() + .readToSingleObject { it.requireString(DATA_FILE) } ?: return@withinTransaction + + writableDatabase + .update(TABLE_NAME) + .values(ARCHIVE_TRANSFER_STATE to state.value) + .where("$ARCHIVE_TRANSFER_STATE != ? AND $DATA_FILE = ?", ArchiveTransferState.PERMANENT_FAILURE.value, dataFile) + .run() + } + } + /** * Sets the archive transfer state for the given attachment by digest. */ @@ -1180,7 +1205,7 @@ class AttachmentTable( // We don't look at hash_start here because that could result in us matching on a file that got compressed down to something smaller, effectively lowering // the quality of the attachment we received. val hashMatch: DataFileInfo? = readableDatabase - .select(ID, DATA_FILE, DATA_SIZE, DATA_RANDOM, DATA_HASH_START, DATA_HASH_END, TRANSFORM_PROPERTIES, UPLOAD_TIMESTAMP, ARCHIVE_CDN) + .select(*DATA_FILE_INFO_PROJECTION) .from(TABLE_NAME) .where("$DATA_HASH_END = ? AND $DATA_HASH_END NOT NULL AND $TRANSFER_STATE = $TRANSFER_PROGRESS_DONE AND $DATA_FILE NOT NULL", fileWriteResult.hash) .run() @@ -1197,6 +1222,10 @@ class AttachmentTable( values.put(DATA_HASH_START, hashMatch.hashEnd) values.put(DATA_HASH_END, hashMatch.hashEnd) values.put(ARCHIVE_CDN, hashMatch.archiveCdn) + values.put(ARCHIVE_TRANSFER_STATE, hashMatch.archiveTransferState) + values.put(THUMBNAIL_FILE, hashMatch.thumbnailFile) + values.put(THUMBNAIL_RANDOM, hashMatch.thumbnailRandom) + values.put(THUMBNAIL_RESTORE_STATE, hashMatch.thumbnailRestoreState) } else { values.put(DATA_FILE, fileWriteResult.file.absolutePath) values.put(DATA_SIZE, fileWriteResult.length) @@ -1748,7 +1777,7 @@ class AttachmentTable( fun getDataFileInfo(attachmentId: AttachmentId): DataFileInfo? { return readableDatabase - .select(ID, DATA_FILE, DATA_SIZE, DATA_RANDOM, DATA_HASH_START, DATA_HASH_END, TRANSFORM_PROPERTIES, UPLOAD_TIMESTAMP, ARCHIVE_CDN) + .select(*DATA_FILE_INFO_PROJECTION) .from(TABLE_NAME) .where("$ID = ?", attachmentId.id) .run() @@ -2402,7 +2431,7 @@ class AttachmentTable( // First we'll check if our file hash matches the starting or ending hash of any other attachments and has compatible transform properties. // We'll prefer the match with the most recent upload timestamp. val hashMatch: DataFileInfo? = readableDatabase - .select(ID, DATA_FILE, DATA_SIZE, DATA_RANDOM, DATA_HASH_START, DATA_HASH_END, TRANSFORM_PROPERTIES, UPLOAD_TIMESTAMP, ARCHIVE_CDN) + .select(*DATA_FILE_INFO_PROJECTION) .from(TABLE_NAME) .where("$DATA_FILE NOT NULL AND ($DATA_HASH_START = ? OR $DATA_HASH_END = ?)", fileWriteResult.hash, fileWriteResult.hash) .run() @@ -2438,6 +2467,10 @@ class AttachmentTable( contentValues.put(DATA_HASH_START, fileWriteResult.hash) contentValues.put(DATA_HASH_END, hashMatch.hashEnd) contentValues.put(ARCHIVE_CDN, hashMatch.archiveCdn) + contentValues.put(ARCHIVE_TRANSFER_STATE, hashMatch.archiveTransferState) + contentValues.put(THUMBNAIL_FILE, hashMatch.thumbnailFile) + contentValues.put(THUMBNAIL_RANDOM, hashMatch.thumbnailRandom) + contentValues.put(THUMBNAIL_RESTORE_STATE, hashMatch.thumbnailRestoreState) if (hashMatch.transformProperties.skipTransform) { Log.i(TAG, "[insertAttachmentWithData] The hash match has a DATA_HASH_END and skipTransform=true, so skipping transform of the new file as well. (MessageId: $messageId, ${attachment.uri})") @@ -2625,7 +2658,11 @@ class AttachmentTable( hashEnd = this.requireString(DATA_HASH_END), transformProperties = TransformProperties.parse(this.requireString(TRANSFORM_PROPERTIES)), uploadTimestamp = this.requireLong(UPLOAD_TIMESTAMP), - archiveCdn = this.requireInt(ARCHIVE_CDN) + archiveCdn = this.requireInt(ARCHIVE_CDN), + archiveTransferState = this.requireInt(ARCHIVE_TRANSFER_STATE), + thumbnailFile = this.requireString(THUMBNAIL_FILE), + thumbnailRandom = this.requireBlob(THUMBNAIL_RANDOM), + thumbnailRestoreState = this.requireInt(THUMBNAIL_RESTORE_STATE) ) } @@ -2689,7 +2726,11 @@ class AttachmentTable( val hashEnd: String?, val transformProperties: TransformProperties, val uploadTimestamp: Long, - val archiveCdn: Int + val archiveCdn: Int, + val archiveTransferState: Int, + val thumbnailFile: String?, + val thumbnailRandom: ByteArray?, + val thumbnailRestoreState: Int ) @VisibleForTesting diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/BackupMediaSnapshotTable.kt b/app/src/main/java/org/thoughtcrime/securesms/database/BackupMediaSnapshotTable.kt index 86313f8e4b..24367aa446 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/BackupMediaSnapshotTable.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/BackupMediaSnapshotTable.kt @@ -137,35 +137,31 @@ class BackupMediaSnapshotTable(context: Context, database: SignalDatabase) : Dat /** * Given a list of media objects, find the ones that we have no knowledge of in our local store. */ - fun getMediaObjectsThatCantBeFound(objects: List): Set { + fun getMediaObjectsThatCantBeFound(objects: List): List { if (objects.isEmpty()) { - return emptySet() + return emptyList() } val queries: List = SqlUtil.buildCollectionQuery( column = MEDIA_ID, values = objects.map { it.mediaId }, - collectionOperator = SqlUtil.CollectionOperator.NOT_IN, - prefix = "$IS_THUMBNAIL = 0 AND " + collectionOperator = SqlUtil.CollectionOperator.IN ) - val out: MutableSet = mutableSetOf() + val foundObjects: MutableSet = mutableSetOf() for (query in queries) { - out += readableDatabase + foundObjects += readableDatabase .select(MEDIA_ID, CDN) .from(TABLE_NAME) .where(query.where, query.whereArgs) .run() .readToSet { - ArchivedMediaObject( - mediaId = it.requireNonNullString(MEDIA_ID), - cdn = it.requireInt(CDN) - ) + it.requireNonNullString(MEDIA_ID) } } - return out + return objects.filterNot { foundObjects.contains(it.mediaId) } } /** diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupMediaSnapshotSyncJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupMediaSnapshotSyncJob.kt index 6461fb1674..220a811280 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupMediaSnapshotSyncJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupMediaSnapshotSyncJob.kt @@ -165,10 +165,10 @@ class BackupMediaSnapshotSyncJob private constructor( } /** - * Update CDNs of archived media items. Returns set of objects that don't match + * Update CDNs of archived media items. Returns list of objects that don't match * to a local attachment DB row. */ - private fun syncCdnPage(archivedItemPage: ArchiveGetMediaItemsResponse): Set { + private fun syncCdnPage(archivedItemPage: ArchiveGetMediaItemsResponse): List { val mediaObjects = archivedItemPage.storedMediaObjects.map { ArchivedMediaObject( mediaId = it.mediaId, diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/CopyAttachmentToArchiveJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/CopyAttachmentToArchiveJob.kt index fa5cdd40c4..ae3f36ff86 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/CopyAttachmentToArchiveJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/CopyAttachmentToArchiveJob.kt @@ -118,7 +118,7 @@ class CopyAttachmentToArchiveJob private constructor(private val attachmentId: A 410 -> { Log.w(TAG, "[$attachmentId] The attachment no longer exists on the transit tier. Scheduling a re-upload.") SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE) - AppDependencies.jobManager.add(UploadAttachmentToArchiveJob(attachmentId)) + AppDependencies.jobManager.add(UploadAttachmentToArchiveJob(attachmentId, canReuseUpload = false)) Result.success() } 413 -> { @@ -163,7 +163,7 @@ class CopyAttachmentToArchiveJob private constructor(private val attachmentId: A SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.COPY_PENDING) } else { Log.w(TAG, "[$attachmentId] Job failed, updating archive transfer state to ${AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE}.") - SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE) + SignalDatabase.attachments.setArchiveTransferStateFailure(attachmentId, AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE) } ArchiveUploadProgress.onAttachmentFinished(attachmentId) diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt index 17fa4642dd..cf88a93c48 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt @@ -6,6 +6,8 @@ package org.thoughtcrime.securesms.jobs import org.signal.core.util.Base64 +import org.signal.core.util.inRoundedDays +import org.signal.core.util.isNotNullOrBlank import org.signal.core.util.logging.Log import org.signal.core.util.readLength import org.signal.protos.resumableuploads.ResumableUpload @@ -32,6 +34,7 @@ import java.net.ProtocolException import kotlin.random.Random import kotlin.random.nextInt import kotlin.time.Duration.Companion.days +import kotlin.time.Duration.Companion.milliseconds /** * Given an attachmentId, this will upload the corresponding attachment to the archive cdn. @@ -40,6 +43,7 @@ import kotlin.time.Duration.Companion.days class UploadAttachmentToArchiveJob private constructor( private val attachmentId: AttachmentId, private var uploadSpec: ResumableUpload?, + private val canReuseUpload: Boolean, parameters: Parameters ) : Job(parameters) { @@ -59,9 +63,10 @@ class UploadAttachmentToArchiveJob private constructor( fun getAllQueueKeys() = (0 until MAX_JOB_QUEUES).map { buildQueueKey(queue = it) } } - constructor(attachmentId: AttachmentId) : this( + constructor(attachmentId: AttachmentId, canReuseUpload: Boolean = true) : this( attachmentId = attachmentId, uploadSpec = null, + canReuseUpload = canReuseUpload, parameters = Parameters.Builder() .addConstraint(NetworkConstraint.KEY) .setLifespan(30.days.inWholeMilliseconds) @@ -71,7 +76,9 @@ class UploadAttachmentToArchiveJob private constructor( ) override fun serialize(): ByteArray = UploadAttachmentToArchiveJobData( - attachmentId = attachmentId.id + attachmentId = attachmentId.id, + uploadSpec = uploadSpec, + canReuseUpload = canReuseUpload ).encode() override fun getFactoryKey(): String = KEY @@ -120,6 +127,13 @@ class UploadAttachmentToArchiveJob private constructor( return Result.failure() } + val timeSinceUpload = System.currentTimeMillis() - attachment.uploadTimestamp + if (canReuseUpload && timeSinceUpload > 0 && timeSinceUpload < AttachmentUploadJob.UPLOAD_REUSE_THRESHOLD && attachment.remoteLocation.isNotNullOrBlank()) { + Log.i(TAG, "We can copy an already-uploaded file. It was uploaded $timeSinceUpload ms (${timeSinceUpload.milliseconds.inRoundedDays()} days) ago. Skipping.") + AppDependencies.jobManager.add(CopyAttachmentToArchiveJob(attachment.attachmentId)) + return Result.success() + } + if (uploadSpec != null && System.currentTimeMillis() > uploadSpec!!.timeout) { Log.w(TAG, "[$attachmentId] Upload spec expired! Clearing.") uploadSpec = null @@ -200,10 +214,10 @@ class UploadAttachmentToArchiveJob private constructor( override fun onFailure() { if (this.isCanceled) { Log.w(TAG, "[$attachmentId] Job was canceled, updating archive transfer state to ${AttachmentTable.ArchiveTransferState.NONE}.") - SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE) + SignalDatabase.attachments.setArchiveTransferStateFailure(attachmentId, AttachmentTable.ArchiveTransferState.NONE) } else { Log.w(TAG, "[$attachmentId] Job failed, updating archive transfer state to ${AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE} (if not already a permanent failure).") - SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE) + SignalDatabase.attachments.setArchiveTransferStateFailure(attachmentId, AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE) } } @@ -253,6 +267,7 @@ class UploadAttachmentToArchiveJob private constructor( return UploadAttachmentToArchiveJob( attachmentId = AttachmentId(data.attachmentId), uploadSpec = data.uploadSpec, + canReuseUpload = data.canReuseUpload == true, parameters = parameters ) } diff --git a/app/src/main/java/org/thoughtcrime/securesms/wallpaper/WallpaperStorage.java b/app/src/main/java/org/thoughtcrime/securesms/wallpaper/WallpaperStorage.java index 12197f34ef..8b3b1da7eb 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/wallpaper/WallpaperStorage.java +++ b/app/src/main/java/org/thoughtcrime/securesms/wallpaper/WallpaperStorage.java @@ -35,7 +35,7 @@ public final class WallpaperStorage { AttachmentId attachmentId = SignalDatabase.attachments().insertWallpaper(wallpaperStream); if (SignalStore.backup().backsUpMedia()) { - AppDependencies.getJobManager().add(new UploadAttachmentToArchiveJob(attachmentId)); + AppDependencies.getJobManager().add(new UploadAttachmentToArchiveJob(attachmentId, true)); } return ChatWallpaperFactory.create(PartAuthority.getAttachmentDataUri(attachmentId)); diff --git a/app/src/main/protowire/JobData.proto b/app/src/main/protowire/JobData.proto index 21f3b9c97c..af19c3f48b 100644 --- a/app/src/main/protowire/JobData.proto +++ b/app/src/main/protowire/JobData.proto @@ -138,6 +138,7 @@ message CopyAttachmentToArchiveJobData { message UploadAttachmentToArchiveJobData { uint64 attachmentId = 1; ResumableUpload uploadSpec = 2; + optional bool canReuseUpload = 3; } message BackupMediaSnapshotSyncJobData { diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/NetworkResult.kt b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/NetworkResult.kt index a5cc062de8..96c9c5bfb7 100644 --- a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/NetworkResult.kt +++ b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/NetworkResult.kt @@ -105,10 +105,12 @@ sealed class NetworkResult( @JvmStatic fun fromWebSocketRequest( signalWebSocket: SignalWebSocket, - request: WebSocketRequestMessage + request: WebSocketRequestMessage, + timeout: Duration = WebSocketConnection.DEFAULT_SEND_TIMEOUT ): NetworkResult = fromWebSocketRequest( signalWebSocket = signalWebSocket, request = request, + timeout = timeout, clazz = Unit::class ) diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/archive/ArchiveApi.kt b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/archive/ArchiveApi.kt index fb8b40b4a5..c45faee718 100644 --- a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/archive/ArchiveApi.kt +++ b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/archive/ArchiveApi.kt @@ -31,6 +31,7 @@ import java.io.InputStream import java.time.Instant import kotlin.time.Duration.Companion.days import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.Duration.Companion.seconds /** * Class to interact with various archive-related endpoints. @@ -346,7 +347,7 @@ class ArchiveApi( .map { it.toArchiveCredentialPresentation().toHeaders() } .then { headers -> val request = WebSocketRequestMessage.post("/v1/archives/media/delete", DeleteArchivedMediaRequest(mediaToDelete = mediaToDelete), headers) - NetworkResult.fromWebSocketRequest(unauthWebSocket, request) + NetworkResult.fromWebSocketRequest(unauthWebSocket, request, timeout = 30.seconds) } }