Improve reconciliation task to reduce possible churn.

This commit is contained in:
Greyson Parrelli
2025-07-15 14:30:05 -04:00
committed by Jeffrey Starke
parent cfa96f4737
commit 4b75b9f1d6
5 changed files with 64 additions and 18 deletions

View File

@@ -204,7 +204,7 @@ class AttachmentTableTest {
// Reset the transfer state by plaintextHash+remoteKey // Reset the transfer state by plaintextHash+remoteKey
val plaintextHash = SignalDatabase.attachments.getAttachment(attachmentId)!!.dataHash!!.decodeBase64OrThrow() val plaintextHash = SignalDatabase.attachments.getAttachment(attachmentId)!!.dataHash!!.decodeBase64OrThrow()
val remoteKey = SignalDatabase.attachments.getAttachment(attachmentId)!!.remoteKey!!.decodeBase64OrThrow() val remoteKey = SignalDatabase.attachments.getAttachment(attachmentId)!!.remoteKey!!.decodeBase64OrThrow()
SignalDatabase.attachments.resetArchiveTransferStateByPlaintextHashAndRemoteKey(plaintextHash, remoteKey) SignalDatabase.attachments.resetArchiveTransferStateByPlaintextHashAndRemoteKeyIfNecessary(plaintextHash, remoteKey)
// Verify it's been reset // Verify it's been reset
assertThat(SignalDatabase.attachments.getAttachment(attachmentId)!!.archiveTransferState).isEqualTo(AttachmentTable.ArchiveTransferState.NONE) assertThat(SignalDatabase.attachments.getAttachment(attachmentId)!!.archiveTransferState).isEqualTo(AttachmentTable.ArchiveTransferState.NONE)

View File

@@ -276,6 +276,7 @@ class AttachmentTable(
""" """
private const val DATA_FILE_INDEX = "attachment_data_index" private const val DATA_FILE_INDEX = "attachment_data_index"
private const val DATA_HASH_REMOTE_KEY_INDEX = "attachment_data_hash_end_remote_key_index"
@JvmField @JvmField
val CREATE_INDEXS = arrayOf( val CREATE_INDEXS = arrayOf(
@@ -283,7 +284,7 @@ class AttachmentTable(
"CREATE INDEX IF NOT EXISTS attachment_transfer_state_index ON $TABLE_NAME ($TRANSFER_STATE);", "CREATE INDEX IF NOT EXISTS attachment_transfer_state_index ON $TABLE_NAME ($TRANSFER_STATE);",
"CREATE INDEX IF NOT EXISTS attachment_sticker_pack_id_index ON $TABLE_NAME ($STICKER_PACK_ID);", "CREATE INDEX IF NOT EXISTS attachment_sticker_pack_id_index ON $TABLE_NAME ($STICKER_PACK_ID);",
"CREATE INDEX IF NOT EXISTS attachment_data_hash_start_index ON $TABLE_NAME ($DATA_HASH_START);", "CREATE INDEX IF NOT EXISTS attachment_data_hash_start_index ON $TABLE_NAME ($DATA_HASH_START);",
"CREATE INDEX IF NOT EXISTS attachment_data_hash_end_remote_key_index ON $TABLE_NAME ($DATA_HASH_END, $REMOTE_KEY);", "CREATE INDEX IF NOT EXISTS $DATA_HASH_REMOTE_KEY_INDEX ON $TABLE_NAME ($DATA_HASH_END, $REMOTE_KEY);",
"CREATE INDEX IF NOT EXISTS $DATA_FILE_INDEX ON $TABLE_NAME ($DATA_FILE);", "CREATE INDEX IF NOT EXISTS $DATA_FILE_INDEX ON $TABLE_NAME ($DATA_FILE);",
"CREATE INDEX IF NOT EXISTS attachment_archive_transfer_state ON $TABLE_NAME ($ARCHIVE_TRANSFER_STATE);", "CREATE INDEX IF NOT EXISTS attachment_archive_transfer_state ON $TABLE_NAME ($ARCHIVE_TRANSFER_STATE);",
"CREATE INDEX IF NOT EXISTS attachment_remote_digest_index ON $TABLE_NAME ($REMOTE_DIGEST);" "CREATE INDEX IF NOT EXISTS attachment_remote_digest_index ON $TABLE_NAME ($REMOTE_DIGEST);"
@@ -421,6 +422,15 @@ class AttachmentTable(
.firstOrNull() .firstOrNull()
} }
fun getAttachmentIdByPlaintextHashAndRemoteKey(plaintextHash: ByteArray, remoteKey: ByteArray): AttachmentId? {
return readableDatabase
.select(ID)
.from("$TABLE_NAME INDEXED BY $DATA_HASH_REMOTE_KEY_INDEX")
.where("$DATA_HASH_END = ? AND $REMOTE_KEY = ?", Base64.encodeWithPadding(plaintextHash), Base64.encodeWithPadding(remoteKey))
.run()
.readToSingleObject { AttachmentId(it.requireLong(ID)) }
}
fun getAttachmentsForMessage(mmsId: Long): List<DatabaseAttachment> { fun getAttachmentsForMessage(mmsId: Long): List<DatabaseAttachment> {
return readableDatabase return readableDatabase
.select(*PROJECTION) .select(*PROJECTION)
@@ -730,16 +740,16 @@ class AttachmentTable(
} }
/** /**
* Sets the archive transfer state for the given attachment by digest. * Resets the archive upload state by hash/key if we believe the attachment should have been uploaded already.
*/ */
fun resetArchiveTransferStateByPlaintextHashAndRemoteKey(plaintextHash: ByteArray, remoteKey: ByteArray): Boolean { fun resetArchiveTransferStateByPlaintextHashAndRemoteKeyIfNecessary(plaintextHash: ByteArray, remoteKey: ByteArray): Boolean {
return writableDatabase return writableDatabase
.update(TABLE_NAME) .update(TABLE_NAME)
.values( .values(
ARCHIVE_TRANSFER_STATE to ArchiveTransferState.NONE.value, ARCHIVE_TRANSFER_STATE to ArchiveTransferState.NONE.value,
ARCHIVE_CDN to null ARCHIVE_CDN to null
) )
.where("$DATA_HASH_END = ? AND $REMOTE_KEY = ?", Base64.encodeWithPadding(plaintextHash), Base64.encodeWithPadding(remoteKey)) .where("$DATA_HASH_END = ? AND $REMOTE_KEY = ? AND $ARCHIVE_TRANSFER_STATE = ${ArchiveTransferState.FINISHED.value}", Base64.encodeWithPadding(plaintextHash), Base64.encodeWithPadding(remoteKey))
.run() > 0 .run() > 0
} }

View File

@@ -25,7 +25,7 @@ class ArchiveAttachmentBackfillJob private constructor(parameters: Parameters) :
constructor() : this( constructor() : this(
parameters = Parameters.Builder() parameters = Parameters.Builder()
.setQueue("ArchiveAttachmentBackfillJob") .setQueue(ArchiveCommitAttachmentDeletesJob.ARCHIVE_ATTACHMENT_QUEUE)
.setMaxInstancesForQueue(2) .setMaxInstancesForQueue(2)
.setLifespan(30.days.inWholeMilliseconds) .setLifespan(30.days.inWholeMilliseconds)
.setMaxAttempts(Parameters.UNLIMITED) .setMaxAttempts(Parameters.UNLIMITED)

View File

@@ -12,8 +12,10 @@ import org.thoughtcrime.securesms.backup.v2.ArchivedMediaObject
import org.thoughtcrime.securesms.backup.v2.BackupRepository import org.thoughtcrime.securesms.backup.v2.BackupRepository
import org.thoughtcrime.securesms.database.BackupMediaSnapshotTable import org.thoughtcrime.securesms.database.BackupMediaSnapshotTable
import org.thoughtcrime.securesms.database.SignalDatabase import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.dependencies.AppDependencies
import org.thoughtcrime.securesms.jobmanager.Job import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint
import org.thoughtcrime.securesms.jobs.ArchiveThumbnailUploadJob.Companion.isForArchiveThumbnailUploadJob
import org.thoughtcrime.securesms.jobs.protos.ArchiveAttachmentReconciliationJobData import org.thoughtcrime.securesms.jobs.protos.ArchiveAttachmentReconciliationJobData
import org.thoughtcrime.securesms.keyvalue.SignalStore import org.thoughtcrime.securesms.keyvalue.SignalStore
import org.thoughtcrime.securesms.util.RemoteConfig import org.thoughtcrime.securesms.util.RemoteConfig
@@ -122,24 +124,53 @@ class ArchiveAttachmentReconciliationJob private constructor(
return Result.failure() return Result.failure()
} }
val mediaObjectsThatNeedReUpload = SignalDatabase.backupMediaSnapshots.getMediaObjectsLastSeenOnCdnBeforeSnapshotVersion(snapshotVersion) val mediaObjectsThatMayNeedReUpload = SignalDatabase.backupMediaSnapshots.getMediaObjectsLastSeenOnCdnBeforeSnapshotVersion(snapshotVersion)
val needReUploadCount = mediaObjectsThatNeedReUpload.count val mayNeedReUploadCount = mediaObjectsThatMayNeedReUpload.count
if (needReUploadCount > 0) { if (mayNeedReUploadCount > 0) {
Log.w(TAG, "Found $needReUploadCount attachments that we thought were uploaded, but could not be found on the CDN. Clearing state and enqueuing uploads.") Log.w(TAG, "Found $mayNeedReUploadCount attachments that are present in the target snapshot, but could not be found on the CDN. This could be a bookkeeping error, or the upload may still be in progress. Checking.")
mediaObjectsThatNeedReUpload.forEach { var newBackupJobRequired = false
val entry = BackupMediaSnapshotTable.MediaEntry.fromCursor(it) var bookkeepingErrorCount = 0
// TODO [backup] Re-enqueue thumbnail uploads if necessary
if (!entry.isThumbnail) { mediaObjectsThatMayNeedReUpload.forEach { mediaObjectCursor ->
val success = SignalDatabase.attachments.resetArchiveTransferStateByPlaintextHashAndRemoteKey(entry.plaintextHash, entry.remoteKey) val entry = BackupMediaSnapshotTable.MediaEntry.fromCursor(mediaObjectCursor)
if (!success) {
Log.e(TAG, "Failed to reset archive transfer state by remote hash/key!") if (entry.isThumbnail) {
val parentAttachmentId = SignalDatabase.attachments.getAttachmentIdByPlaintextHashAndRemoteKey(entry.plaintextHash, entry.remoteKey)
if (parentAttachmentId == null) {
Log.w(TAG, "Failed to find parent attachment for thumbnail that may need reupload. Skipping.")
return@forEach
}
if (AppDependencies.jobManager.find { it.isForArchiveThumbnailUploadJob(parentAttachmentId) }.isEmpty()) {
Log.w(TAG, "A thumbnail was missing from remote for $parentAttachmentId and no in-progress job was found. Re-enqueueing one.")
ArchiveThumbnailUploadJob.enqueueIfNecessary(parentAttachmentId)
bookkeepingErrorCount++
} else {
Log.i(TAG, "A thumbnail was missing from remote for $parentAttachmentId, but a job is already in progress.")
}
} else {
val wasReset = SignalDatabase.attachments.resetArchiveTransferStateByPlaintextHashAndRemoteKeyIfNecessary(entry.plaintextHash, entry.remoteKey)
if (wasReset) {
newBackupJobRequired = true
bookkeepingErrorCount++
} else {
Log.w(TAG, "Did not need to reset the the transfer state by hash/key because the attachment either no longer exists or the upload is already in-progress.")
} }
} }
} }
BackupMessagesJob.enqueue() if (bookkeepingErrorCount > 0) {
Log.w(TAG, "Found that $bookkeepingErrorCount/$mayNeedReUploadCount of the CDN mismatches were bookkeeping errors.")
} else {
Log.i(TAG, "None of the $mayNeedReUploadCount CDN mismatches were bookkeeping errors.")
}
if (newBackupJobRequired) {
Log.w(TAG, "Some of the errors require re-uploading a new backup job to resolve.")
BackupMessagesJob.enqueue()
}
} else { } else {
Log.d(TAG, "No attachments need to be repaired.") Log.d(TAG, "No attachments need to be repaired.")
} }

View File

@@ -17,6 +17,7 @@ import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.dependencies.AppDependencies import org.thoughtcrime.securesms.dependencies.AppDependencies
import org.thoughtcrime.securesms.jobmanager.Job import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint
import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec
import org.thoughtcrime.securesms.jobs.protos.ArchiveThumbnailUploadJobData import org.thoughtcrime.securesms.jobs.protos.ArchiveThumbnailUploadJobData
import org.thoughtcrime.securesms.keyvalue.SignalStore import org.thoughtcrime.securesms.keyvalue.SignalStore
import org.thoughtcrime.securesms.mms.DecryptableStreamUriLoader.DecryptableUri import org.thoughtcrime.securesms.mms.DecryptableStreamUriLoader.DecryptableUri
@@ -58,6 +59,10 @@ class ArchiveThumbnailUploadJob private constructor(
AppDependencies.jobManager.add(ArchiveThumbnailUploadJob(attachmentId)) AppDependencies.jobManager.add(ArchiveThumbnailUploadJob(attachmentId))
} }
} }
fun JobSpec.isForArchiveThumbnailUploadJob(attachmentId: AttachmentId): Boolean {
return this.factoryKey == KEY && this.serializedData?.let { ArchiveThumbnailUploadJobData.ADAPTER.decode(it).attachmentId } == attachmentId.id
}
} }
private constructor(attachmentId: AttachmentId) : this( private constructor(attachmentId: AttachmentId) : this(