From 4b75b9f1d6c87fdcd42d4643b570124d9e7c4eca Mon Sep 17 00:00:00 2001 From: Greyson Parrelli Date: Tue, 15 Jul 2025 14:30:05 -0400 Subject: [PATCH] Improve reconciliation task to reduce possible churn. --- .../securesms/database/AttachmentTableTest.kt | 2 +- .../securesms/database/AttachmentTable.kt | 18 ++++-- .../jobs/ArchiveAttachmentBackfillJob.kt | 2 +- .../ArchiveAttachmentReconciliationJob.kt | 55 +++++++++++++++---- .../jobs/ArchiveThumbnailUploadJob.kt | 5 ++ 5 files changed, 64 insertions(+), 18 deletions(-) diff --git a/app/src/androidTest/java/org/thoughtcrime/securesms/database/AttachmentTableTest.kt b/app/src/androidTest/java/org/thoughtcrime/securesms/database/AttachmentTableTest.kt index 9e6d2a453a..9234ac5a15 100644 --- a/app/src/androidTest/java/org/thoughtcrime/securesms/database/AttachmentTableTest.kt +++ b/app/src/androidTest/java/org/thoughtcrime/securesms/database/AttachmentTableTest.kt @@ -204,7 +204,7 @@ class AttachmentTableTest { // Reset the transfer state by plaintextHash+remoteKey val plaintextHash = SignalDatabase.attachments.getAttachment(attachmentId)!!.dataHash!!.decodeBase64OrThrow() val remoteKey = SignalDatabase.attachments.getAttachment(attachmentId)!!.remoteKey!!.decodeBase64OrThrow() - SignalDatabase.attachments.resetArchiveTransferStateByPlaintextHashAndRemoteKey(plaintextHash, remoteKey) + SignalDatabase.attachments.resetArchiveTransferStateByPlaintextHashAndRemoteKeyIfNecessary(plaintextHash, remoteKey) // Verify it's been reset assertThat(SignalDatabase.attachments.getAttachment(attachmentId)!!.archiveTransferState).isEqualTo(AttachmentTable.ArchiveTransferState.NONE) diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt b/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt index 5dda8a5672..043be5f5c3 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt @@ -276,6 +276,7 @@ class AttachmentTable( """ private const val DATA_FILE_INDEX = "attachment_data_index" + private const val DATA_HASH_REMOTE_KEY_INDEX = "attachment_data_hash_end_remote_key_index" @JvmField val CREATE_INDEXS = arrayOf( @@ -283,7 +284,7 @@ class AttachmentTable( "CREATE INDEX IF NOT EXISTS attachment_transfer_state_index ON $TABLE_NAME ($TRANSFER_STATE);", "CREATE INDEX IF NOT EXISTS attachment_sticker_pack_id_index ON $TABLE_NAME ($STICKER_PACK_ID);", "CREATE INDEX IF NOT EXISTS attachment_data_hash_start_index ON $TABLE_NAME ($DATA_HASH_START);", - "CREATE INDEX IF NOT EXISTS attachment_data_hash_end_remote_key_index ON $TABLE_NAME ($DATA_HASH_END, $REMOTE_KEY);", + "CREATE INDEX IF NOT EXISTS $DATA_HASH_REMOTE_KEY_INDEX ON $TABLE_NAME ($DATA_HASH_END, $REMOTE_KEY);", "CREATE INDEX IF NOT EXISTS $DATA_FILE_INDEX ON $TABLE_NAME ($DATA_FILE);", "CREATE INDEX IF NOT EXISTS attachment_archive_transfer_state ON $TABLE_NAME ($ARCHIVE_TRANSFER_STATE);", "CREATE INDEX IF NOT EXISTS attachment_remote_digest_index ON $TABLE_NAME ($REMOTE_DIGEST);" @@ -421,6 +422,15 @@ class AttachmentTable( .firstOrNull() } + fun getAttachmentIdByPlaintextHashAndRemoteKey(plaintextHash: ByteArray, remoteKey: ByteArray): AttachmentId? { + return readableDatabase + .select(ID) + .from("$TABLE_NAME INDEXED BY $DATA_HASH_REMOTE_KEY_INDEX") + .where("$DATA_HASH_END = ? AND $REMOTE_KEY = ?", Base64.encodeWithPadding(plaintextHash), Base64.encodeWithPadding(remoteKey)) + .run() + .readToSingleObject { AttachmentId(it.requireLong(ID)) } + } + fun getAttachmentsForMessage(mmsId: Long): List { return readableDatabase .select(*PROJECTION) @@ -730,16 +740,16 @@ class AttachmentTable( } /** - * Sets the archive transfer state for the given attachment by digest. + * Resets the archive upload state by hash/key if we believe the attachment should have been uploaded already. */ - fun resetArchiveTransferStateByPlaintextHashAndRemoteKey(plaintextHash: ByteArray, remoteKey: ByteArray): Boolean { + fun resetArchiveTransferStateByPlaintextHashAndRemoteKeyIfNecessary(plaintextHash: ByteArray, remoteKey: ByteArray): Boolean { return writableDatabase .update(TABLE_NAME) .values( ARCHIVE_TRANSFER_STATE to ArchiveTransferState.NONE.value, ARCHIVE_CDN to null ) - .where("$DATA_HASH_END = ? AND $REMOTE_KEY = ?", Base64.encodeWithPadding(plaintextHash), Base64.encodeWithPadding(remoteKey)) + .where("$DATA_HASH_END = ? AND $REMOTE_KEY = ? AND $ARCHIVE_TRANSFER_STATE = ${ArchiveTransferState.FINISHED.value}", Base64.encodeWithPadding(plaintextHash), Base64.encodeWithPadding(remoteKey)) .run() > 0 } diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveAttachmentBackfillJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveAttachmentBackfillJob.kt index 047e478562..61cfbcb28a 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveAttachmentBackfillJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveAttachmentBackfillJob.kt @@ -25,7 +25,7 @@ class ArchiveAttachmentBackfillJob private constructor(parameters: Parameters) : constructor() : this( parameters = Parameters.Builder() - .setQueue("ArchiveAttachmentBackfillJob") + .setQueue(ArchiveCommitAttachmentDeletesJob.ARCHIVE_ATTACHMENT_QUEUE) .setMaxInstancesForQueue(2) .setLifespan(30.days.inWholeMilliseconds) .setMaxAttempts(Parameters.UNLIMITED) diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveAttachmentReconciliationJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveAttachmentReconciliationJob.kt index 2e0d1f22fb..8b970fdbcf 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveAttachmentReconciliationJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveAttachmentReconciliationJob.kt @@ -12,8 +12,10 @@ import org.thoughtcrime.securesms.backup.v2.ArchivedMediaObject import org.thoughtcrime.securesms.backup.v2.BackupRepository import org.thoughtcrime.securesms.database.BackupMediaSnapshotTable import org.thoughtcrime.securesms.database.SignalDatabase +import org.thoughtcrime.securesms.dependencies.AppDependencies import org.thoughtcrime.securesms.jobmanager.Job import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint +import org.thoughtcrime.securesms.jobs.ArchiveThumbnailUploadJob.Companion.isForArchiveThumbnailUploadJob import org.thoughtcrime.securesms.jobs.protos.ArchiveAttachmentReconciliationJobData import org.thoughtcrime.securesms.keyvalue.SignalStore import org.thoughtcrime.securesms.util.RemoteConfig @@ -122,24 +124,53 @@ class ArchiveAttachmentReconciliationJob private constructor( return Result.failure() } - val mediaObjectsThatNeedReUpload = SignalDatabase.backupMediaSnapshots.getMediaObjectsLastSeenOnCdnBeforeSnapshotVersion(snapshotVersion) - val needReUploadCount = mediaObjectsThatNeedReUpload.count + val mediaObjectsThatMayNeedReUpload = SignalDatabase.backupMediaSnapshots.getMediaObjectsLastSeenOnCdnBeforeSnapshotVersion(snapshotVersion) + val mayNeedReUploadCount = mediaObjectsThatMayNeedReUpload.count - if (needReUploadCount > 0) { - Log.w(TAG, "Found $needReUploadCount attachments that we thought were uploaded, but could not be found on the CDN. Clearing state and enqueuing uploads.") + if (mayNeedReUploadCount > 0) { + Log.w(TAG, "Found $mayNeedReUploadCount attachments that are present in the target snapshot, but could not be found on the CDN. This could be a bookkeeping error, or the upload may still be in progress. Checking.") - mediaObjectsThatNeedReUpload.forEach { - val entry = BackupMediaSnapshotTable.MediaEntry.fromCursor(it) - // TODO [backup] Re-enqueue thumbnail uploads if necessary - if (!entry.isThumbnail) { - val success = SignalDatabase.attachments.resetArchiveTransferStateByPlaintextHashAndRemoteKey(entry.plaintextHash, entry.remoteKey) - if (!success) { - Log.e(TAG, "Failed to reset archive transfer state by remote hash/key!") + var newBackupJobRequired = false + var bookkeepingErrorCount = 0 + + mediaObjectsThatMayNeedReUpload.forEach { mediaObjectCursor -> + val entry = BackupMediaSnapshotTable.MediaEntry.fromCursor(mediaObjectCursor) + + if (entry.isThumbnail) { + val parentAttachmentId = SignalDatabase.attachments.getAttachmentIdByPlaintextHashAndRemoteKey(entry.plaintextHash, entry.remoteKey) + if (parentAttachmentId == null) { + Log.w(TAG, "Failed to find parent attachment for thumbnail that may need reupload. Skipping.") + return@forEach + } + + if (AppDependencies.jobManager.find { it.isForArchiveThumbnailUploadJob(parentAttachmentId) }.isEmpty()) { + Log.w(TAG, "A thumbnail was missing from remote for $parentAttachmentId and no in-progress job was found. Re-enqueueing one.") + ArchiveThumbnailUploadJob.enqueueIfNecessary(parentAttachmentId) + bookkeepingErrorCount++ + } else { + Log.i(TAG, "A thumbnail was missing from remote for $parentAttachmentId, but a job is already in progress.") + } + } else { + val wasReset = SignalDatabase.attachments.resetArchiveTransferStateByPlaintextHashAndRemoteKeyIfNecessary(entry.plaintextHash, entry.remoteKey) + if (wasReset) { + newBackupJobRequired = true + bookkeepingErrorCount++ + } else { + Log.w(TAG, "Did not need to reset the the transfer state by hash/key because the attachment either no longer exists or the upload is already in-progress.") } } } - BackupMessagesJob.enqueue() + if (bookkeepingErrorCount > 0) { + Log.w(TAG, "Found that $bookkeepingErrorCount/$mayNeedReUploadCount of the CDN mismatches were bookkeeping errors.") + } else { + Log.i(TAG, "None of the $mayNeedReUploadCount CDN mismatches were bookkeeping errors.") + } + + if (newBackupJobRequired) { + Log.w(TAG, "Some of the errors require re-uploading a new backup job to resolve.") + BackupMessagesJob.enqueue() + } } else { Log.d(TAG, "No attachments need to be repaired.") } diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveThumbnailUploadJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveThumbnailUploadJob.kt index f88a3d2b71..f2dd3f7f55 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveThumbnailUploadJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveThumbnailUploadJob.kt @@ -17,6 +17,7 @@ import org.thoughtcrime.securesms.database.SignalDatabase import org.thoughtcrime.securesms.dependencies.AppDependencies import org.thoughtcrime.securesms.jobmanager.Job import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint +import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec import org.thoughtcrime.securesms.jobs.protos.ArchiveThumbnailUploadJobData import org.thoughtcrime.securesms.keyvalue.SignalStore import org.thoughtcrime.securesms.mms.DecryptableStreamUriLoader.DecryptableUri @@ -58,6 +59,10 @@ class ArchiveThumbnailUploadJob private constructor( AppDependencies.jobManager.add(ArchiveThumbnailUploadJob(attachmentId)) } } + + fun JobSpec.isForArchiveThumbnailUploadJob(attachmentId: AttachmentId): Boolean { + return this.factoryKey == KEY && this.serializedData?.let { ArchiveThumbnailUploadJobData.ADAPTER.decode(it).attachmentId } == attachmentId.id + } } private constructor(attachmentId: AttachmentId) : this(