Add additional CDN reconciliations to BackupMediaSnapshotSyncJob.

Co-authored-by: Cody Henthorne <cody@signal.org>
This commit is contained in:
Greyson Parrelli
2025-04-25 11:03:26 -04:00
committed by Cody Henthorne
parent 85647f1258
commit f73d929feb
13 changed files with 434 additions and 45 deletions

View File

@@ -663,6 +663,20 @@ class AttachmentTable(
}
}
/**
* Sets the archive transfer state for the given attachment by digest.
*/
fun resetArchiveTransferStateByDigest(digest: ByteArray) {
writableDatabase
.update(TABLE_NAME)
.values(
ARCHIVE_TRANSFER_STATE to ArchiveTransferState.NONE.value,
ARCHIVE_CDN to 0
)
.where("$REMOTE_DIGEST = ?", digest)
.run()
}
/**
* Sets the archive transfer state for the given attachment and all other attachments that share the same data file.
*/

View File

@@ -6,17 +6,21 @@
package org.thoughtcrime.securesms.database
import android.content.Context
import android.database.Cursor
import androidx.annotation.VisibleForTesting
import androidx.core.content.contentValuesOf
import org.signal.core.util.SqlUtil
import org.signal.core.util.delete
import org.signal.core.util.readToList
import org.signal.core.util.readToSet
import org.signal.core.util.requireBoolean
import org.signal.core.util.requireInt
import org.signal.core.util.requireNonNullBlob
import org.signal.core.util.requireNonNullString
import org.signal.core.util.select
import org.signal.core.util.toInt
import org.signal.core.util.update
import org.signal.core.util.updateAll
import org.thoughtcrime.securesms.backup.v2.ArchivedMediaObject
/**
@@ -60,6 +64,11 @@ class BackupMediaSnapshotTable(context: Context, database: SignalDatabase) : Dat
*/
const val IS_THUMBNAIL = "is_thumbnail"
/**
* Timestamp when media was last seen on archive cdn. Can be reset to default.
*/
const val LAST_SEEN_ON_REMOTE_TIMESTAMP = "last_seen_on_remote_timestamp"
/**
* The remote digest for the media object. This is used to find matching attachments in the attachment table when necessary.
*/
@@ -73,7 +82,8 @@ class BackupMediaSnapshotTable(context: Context, database: SignalDatabase) : Dat
$LAST_SYNC_TIME INTEGER DEFAULT 0,
$PENDING_SYNC_TIME INTEGER,
$IS_THUMBNAIL INTEGER DEFAULT 0,
$REMOTE_DIGEST BLOB NOT NULL
$REMOTE_DIGEST BLOB NOT NULL,
$LAST_SEEN_ON_REMOTE_TIMESTAMP INTEGER DEFAULT 0
)
""".trimIndent()
}
@@ -132,24 +142,30 @@ class BackupMediaSnapshotTable(context: Context, database: SignalDatabase) : Dat
return emptySet()
}
val query = SqlUtil.buildSingleCollectionQuery(
val queries: List<SqlUtil.Query> = SqlUtil.buildCollectionQuery(
column = MEDIA_ID,
values = objects.map { it.mediaId },
collectionOperator = SqlUtil.CollectionOperator.NOT_IN,
prefix = "$IS_THUMBNAIL = 0 AND "
)
return readableDatabase
.select(MEDIA_ID, CDN)
.from(TABLE_NAME)
.where(query.where, query.whereArgs)
.run()
.readToSet {
ArchivedMediaObject(
mediaId = it.requireNonNullString(MEDIA_ID),
cdn = it.requireInt(CDN)
)
}
val out: MutableSet<ArchivedMediaObject> = mutableSetOf()
for (query in queries) {
out += readableDatabase
.select(MEDIA_ID, CDN)
.from(TABLE_NAME)
.where(query.where, query.whereArgs)
.run()
.readToSet {
ArchivedMediaObject(
mediaId = it.requireNonNullString(MEDIA_ID),
cdn = it.requireInt(CDN)
)
}
}
return out
}
/**
@@ -177,6 +193,47 @@ class BackupMediaSnapshotTable(context: Context, database: SignalDatabase) : Dat
}
}
/**
* Indicate the time that the set of media objects were seen on the archive CDN. Can be used to reconcile our local state with the server state.
*/
fun markSeenOnRemote(mediaIdBatch: Collection<String>, time: Long) {
if (mediaIdBatch.isEmpty()) {
return
}
val query = SqlUtil.buildFastCollectionQuery(MEDIA_ID, mediaIdBatch)
writableDatabase
.update(TABLE_NAME)
.values(LAST_SEEN_ON_REMOTE_TIMESTAMP to time)
.where(query.where, query.whereArgs)
.run()
}
/**
* Get all media objects who were last seen on the remote server before the given time.
* This is used to find media objects that have not been seen on the CDN, even though they should be.
*
* The cursor contains rows that can be parsed into [MediaEntry] objects.
*/
fun getMediaObjectsLastSeenOnCdnBeforeTime(time: Long): Cursor {
return readableDatabase
.select(MEDIA_ID, CDN, REMOTE_DIGEST, IS_THUMBNAIL)
.from(TABLE_NAME)
.where("$LAST_SEEN_ON_REMOTE_TIMESTAMP < $time")
.run()
}
/**
* Resets the [LAST_SEEN_ON_REMOTE_TIMESTAMP] column back to zero. It's a good idea to do this after you have run a sync and used the value, as it can
* mitigate various issues that can arise from having an incorrect local clock.
*/
fun clearLastSeenOnRemote() {
writableDatabase
.updateAll(TABLE_NAME)
.values(LAST_SEEN_ON_REMOTE_TIMESTAMP to 0)
.run()
}
private fun writePendingMediaObjectsChunk(chunk: List<MediaEntry>, pendingSyncTime: Long) {
val values = chunk.map {
contentValuesOf(
@@ -213,10 +270,21 @@ class BackupMediaSnapshotTable(context: Context, database: SignalDatabase) : Dat
val cdn: Int
)
private data class MediaEntry(
class MediaEntry(
val mediaId: String,
val cdn: Int,
val digest: ByteArray,
val isThumbnail: Boolean
)
) {
companion object {
fun fromCursor(cursor: Cursor): MediaEntry {
return MediaEntry(
mediaId = cursor.requireNonNullString(MEDIA_ID),
cdn = cursor.requireInt(CDN),
digest = cursor.requireNonNullBlob(REMOTE_DIGEST),
isThumbnail = cursor.requireBoolean(IS_THUMBNAIL)
)
}
}
}
}

View File

@@ -128,6 +128,7 @@ import org.thoughtcrime.securesms.database.helpers.migration.V270_FixChatFolderC
import org.thoughtcrime.securesms.database.helpers.migration.V271_AddNotificationProfileIdColumn
import org.thoughtcrime.securesms.database.helpers.migration.V272_UpdateUnreadCountIndices
import org.thoughtcrime.securesms.database.helpers.migration.V273_FixUnreadOriginalMessages
import org.thoughtcrime.securesms.database.helpers.migration.V274_BackupMediaSnapshotLastSeenOnRemote
import org.thoughtcrime.securesms.database.SQLiteDatabase as SignalSqliteDatabase
/**
@@ -261,10 +262,11 @@ object SignalDatabaseMigrations {
270 to V270_FixChatFolderColumnsForStorageSync,
271 to V271_AddNotificationProfileIdColumn,
272 to V272_UpdateUnreadCountIndices,
273 to V273_FixUnreadOriginalMessages
273 to V273_FixUnreadOriginalMessages,
274 to V274_BackupMediaSnapshotLastSeenOnRemote
)
const val DATABASE_VERSION = 273
const val DATABASE_VERSION = 274
@JvmStatic
fun migrate(context: Application, db: SignalSqliteDatabase, oldVersion: Int, newVersion: Int) {

View File

@@ -0,0 +1,18 @@
/*
* Copyright 2025 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.database.helpers.migration
import android.app.Application
import org.thoughtcrime.securesms.database.SQLiteDatabase
/**
* Added a column to the backup media snapshot table to keep track of the last time we saw an object on the CDN.
*/
object V274_BackupMediaSnapshotLastSeenOnRemote : SignalDatabaseMigration {
override fun migrate(context: Application, db: SQLiteDatabase, oldVersion: Int, newVersion: Int) {
db.execSQL("ALTER TABLE backup_media_snapshot ADD COLUMN last_seen_on_remote_timestamp INTEGER DEFAULT 0")
}
}

View File

@@ -14,8 +14,7 @@ import org.thoughtcrime.securesms.keyvalue.SignalStore
import kotlin.time.Duration.Companion.days
/**
* When run, this will find the next attachment that needs to be uploaded to the archive service and upload it.
* It will enqueue a copy of itself if it thinks there is more work to be done, and that copy will continue the upload process.
* When run, this will find all of the attachments that need to be uploaded to the archive tier and enqueue [UploadAttachmentToArchiveJob]s for them.
*/
class ArchiveAttachmentBackfillJob private constructor(parameters: Parameters) : Job(parameters) {
companion object {

View File

@@ -5,10 +5,12 @@
package org.thoughtcrime.securesms.jobs
import org.signal.core.util.forEach
import org.signal.core.util.logging.Log
import org.signal.core.util.nullIfBlank
import org.thoughtcrime.securesms.backup.v2.ArchivedMediaObject
import org.thoughtcrime.securesms.backup.v2.BackupRepository
import org.thoughtcrime.securesms.database.BackupMediaSnapshotTable
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.dependencies.AppDependencies
import org.thoughtcrime.securesms.jobmanager.Job
@@ -40,7 +42,8 @@ class BackupMediaSnapshotSyncJob private constructor(
const val KEY = "BackupMediaSnapshotSyncJob"
private const val REMOTE_DELETE_BATCH_SIZE = 500
private const val REMOTE_DELETE_BATCH_SIZE = 750
private const val CDN_PAGE_SIZE = 10_000
private val BACKUP_MEDIA_SYNC_INTERVAL = 7.days.inWholeMilliseconds
fun enqueue(syncTime: Long) {
@@ -79,7 +82,9 @@ class BackupMediaSnapshotSyncJob private constructor(
return syncDataFromCdn() ?: Result.success()
}
override fun onFailure() = Unit
override fun onFailure() {
SignalDatabase.backupMediaSnapshots.clearLastSeenOnRemote()
}
/**
* Looks through our local snapshot of what attachments we put in the last backup file, and uses that to delete any old attachments from the archive CDN
@@ -92,7 +97,7 @@ class BackupMediaSnapshotSyncJob private constructor(
deleteMediaObjectsFromCdn(mediaObjects)?.let { result -> return result }
SignalDatabase.backupMediaSnapshots.deleteMediaObjects(mediaObjects)
mediaObjects = SignalDatabase.backupMediaSnapshots.getPageOfOldMediaObjects(syncTime, REMOTE_DELETE_BATCH_SIZE)
mediaObjects = SignalDatabase.backupMediaSnapshots.getPageOfOldMediaObjects(syncTime, CDN_PAGE_SIZE)
}
return null
@@ -135,6 +140,25 @@ class BackupMediaSnapshotSyncJob private constructor(
deleteMediaObjectsFromCdn(attachmentsToDelete)?.let { result -> return result }
}
val entriesNeedingRepairCursor = SignalDatabase.backupMediaSnapshots.getMediaObjectsLastSeenOnCdnBeforeTime(syncTime)
val needRepairCount = entriesNeedingRepairCursor.count
if (needRepairCount > 0) {
Log.w(TAG, "Found $needRepairCount attachments that we thought were uploaded, but could not be found on the CDN. Clearing state and enqueuing uploads.")
entriesNeedingRepairCursor.forEach {
val entry = BackupMediaSnapshotTable.MediaEntry.fromCursor(it)
// TODO [backup] Re-enqueue thumbnail uploads if necessary
if (!entry.isThumbnail) {
SignalDatabase.attachments.resetArchiveTransferStateByDigest(entry.digest)
}
}
BackupMessagesJob.enqueue()
} else {
Log.d(TAG, "No attachments need to be repaired.")
}
SignalStore.backup.lastMediaSyncTime = System.currentTimeMillis()
return null
@@ -152,6 +176,11 @@ class BackupMediaSnapshotSyncJob private constructor(
)
}
SignalDatabase.backupMediaSnapshots.markSeenOnRemote(
mediaIdBatch = mediaObjects.map { it.mediaId },
time = syncTime
)
val notFoundMediaObjects = SignalDatabase.backupMediaSnapshots.getMediaObjectsThatCantBeFound(mediaObjects)
val remainingObjects = mediaObjects - notFoundMediaObjects