Bring back proper archive delete reconciliation.

This commit is contained in:
Greyson Parrelli
2025-09-15 16:00:26 -04:00
parent a575626abb
commit 7f429dc769
3 changed files with 96 additions and 25 deletions

View File

@@ -73,6 +73,7 @@ import org.thoughtcrime.securesms.attachments.DatabaseAttachment
import org.thoughtcrime.securesms.attachments.LocalStickerAttachment
import org.thoughtcrime.securesms.attachments.WallpaperAttachment
import org.thoughtcrime.securesms.audio.AudioHash
import org.thoughtcrime.securesms.backup.v2.ArchivedMediaObject
import org.thoughtcrime.securesms.backup.v2.exporters.ChatItemArchiveExporter
import org.thoughtcrime.securesms.backup.v2.proto.BackupDebugInfo
import org.thoughtcrime.securesms.blurhash.BlurHash
@@ -3174,6 +3175,32 @@ class AttachmentTable(
}
}
fun getMediaObjectsThatCantBeFound(objects: Set<ArchivedMediaObject>): Set<ArchivedMediaObject> {
if (objects.isEmpty()) {
return emptySet()
}
val objectsByMediaId: MutableMap<String, ArchivedMediaObject> = objects.associateBy { it.mediaId }.toMutableMap()
readableDatabase
.select(*PROJECTION)
.from(TABLE_NAME)
.where("$REMOTE_KEY NOT NULL AND $DATA_HASH_END NOT NULL")
.groupBy("$DATA_HASH_END, $REMOTE_KEY")
.run()
.forEach { cursor ->
val remoteKey = Base64.decode(cursor.requireNonNullString(REMOTE_KEY))
val plaintextHash = Base64.decode(cursor.requireNonNullString(DATA_HASH_END))
val mediaId = MediaName.fromPlaintextHashAndRemoteKey(plaintextHash, remoteKey).toMediaId(SignalStore.backup.mediaRootBackupKey).encode()
val mediaIdThumbnail = MediaName.fromPlaintextHashAndRemoteKeyForThumbnail(plaintextHash, remoteKey).toMediaId(SignalStore.backup.mediaRootBackupKey).encode()
objectsByMediaId.remove(mediaId)
objectsByMediaId.remove(mediaIdThumbnail)
}
return objectsByMediaId.values.toSet()
}
/**
* Important: This is an expensive query that involves iterating over every row in the table. Only call this for debug stuff!
*/
@@ -3189,7 +3216,6 @@ class AttachmentTable(
.groupBy(DATA_HASH_END)
.run()
.forEach { cursor ->
val remoteKey = Base64.decode(cursor.requireNonNullString(REMOTE_KEY))
val plaintextHash = Base64.decode(cursor.requireNonNullString(DATA_HASH_END))
val mediaId = MediaName.fromPlaintextHashAndRemoteKey(plaintextHash, remoteKey).toMediaId(SignalStore.backup.mediaRootBackupKey).value.toByteString()

View File

@@ -11,6 +11,7 @@ import androidx.annotation.VisibleForTesting
import androidx.core.content.contentValuesOf
import org.signal.core.util.SqlUtil
import org.signal.core.util.delete
import org.signal.core.util.forEach
import org.signal.core.util.readToList
import org.signal.core.util.readToSet
import org.signal.core.util.readToSingleLong
@@ -214,6 +215,8 @@ class BackupMediaSnapshotTable(context: Context, database: SignalDatabase) : Dat
return emptySet()
}
val objectsByMediaId: MutableMap<String, ArchivedMediaObject> = objects.associateBy { it.mediaId }.toMutableMap()
val queries: List<SqlUtil.Query> = SqlUtil.buildCollectionQuery(
column = MEDIA_ID,
values = objects.map { it.mediaId },
@@ -221,20 +224,19 @@ class BackupMediaSnapshotTable(context: Context, database: SignalDatabase) : Dat
prefix = "$SNAPSHOT_VERSION = $MAX_VERSION AND "
)
val foundObjects: MutableSet<String> = mutableSetOf()
for (query in queries) {
foundObjects += readableDatabase
readableDatabase
.select(MEDIA_ID, CDN)
.from(TABLE_NAME)
.where(query.where, query.whereArgs)
.run()
.readToSet {
it.requireNonNullString(MEDIA_ID)
.forEach {
val mediaId = it.requireNonNullString(MEDIA_ID)
objectsByMediaId.remove(mediaId)
}
}
return objects.filterNot { foundObjects.contains(it.mediaId) }.toSet()
return objectsByMediaId.values.toSet()
}
fun getMediaEntriesForObjects(objects: List<ArchivedMediaObject>): Set<MediaEntry> {

View File

@@ -10,7 +10,9 @@ import android.app.PendingIntent
import android.content.Intent
import androidx.core.app.NotificationCompat
import androidx.core.app.NotificationManagerCompat
import org.signal.core.util.EventTimer
import org.signal.core.util.PendingIntentFlags
import org.signal.core.util.Stopwatch
import org.signal.core.util.forEach
import org.signal.core.util.logging.Log
import org.signal.core.util.nullIfBlank
@@ -60,6 +62,7 @@ class ArchiveAttachmentReconciliationJob private constructor(
const val KEY = "ArchiveAttachmentReconciliationJob"
private const val CDN_FETCH_LIMIT = 10_000
private const val DELETE_BATCH_SIZE = 10_000
}
constructor(forced: Boolean = false) : this(
@@ -125,6 +128,9 @@ class ArchiveAttachmentReconciliationJob private constructor(
* that was made before all of the attachments had been uploaded).
*/
private fun syncDataFromCdn(snapshotVersion: Long): Result? {
val stopwatch = Stopwatch("sync")
val eventTimer = EventTimer()
val pendingRemoteDeletes: MutableSet<ArchivedMediaObject> = mutableSetOf()
do {
if (isCanceled) {
Log.w(TAG, "Job cancelled while syncing archived attachments from the CDN.", true)
@@ -139,7 +145,12 @@ class ArchiveAttachmentReconciliationJob private constructor(
Log.d(TAG, "Fetched CDN page. Requested size: $CDN_FETCH_LIMIT, Actual size: ${archivedItemPage.storedMediaObjects.size}")
syncCdnPage(archivedItemPage, snapshotVersion)?.let { return it }
pendingRemoteDeletes += syncCdnPage(archivedItemPage, snapshotVersion)
if (pendingRemoteDeletes.size > DELETE_BATCH_SIZE) {
validateAndDeleteFromRemote(pendingRemoteDeletes)?.let { return it }
pendingRemoteDeletes.clear()
}
eventTimer.emit("page")
serverCursor = archivedItemPage.cursor
} while (serverCursor != null)
@@ -148,11 +159,22 @@ class ArchiveAttachmentReconciliationJob private constructor(
Log.w(TAG, "Job cancelled while syncing archived attachments from the CDN.", true)
return Result.failure()
}
stopwatch.split("fetch-and-delete")
Log.d(TAG, "BEFORE:\n" + SignalDatabase.attachments.debugGetAttachmentStats().prettyString(), true)
if (pendingRemoteDeletes.isNotEmpty()) {
validateAndDeleteFromRemote(pendingRemoteDeletes)?.let { return it }
pendingRemoteDeletes.clear()
}
stopwatch.split("final-delete")
Log.d(TAG, eventTimer.stop().summary)
Log.d(TAG, "BEFORE:\n" + SignalDatabase.attachments.debugGetAttachmentStats().shortPrettyString(), true)
stopwatch.split("stats-before")
val mediaObjectsThatMayNeedReUpload = SignalDatabase.backupMediaSnapshots.getMediaObjectsLastSeenOnCdnBeforeSnapshotVersion(snapshotVersion)
val mayNeedReUploadCount = mediaObjectsThatMayNeedReUpload.count
stopwatch.split("last-seen")
val mediaIdsThatNeedUpload = mutableSetOf<MediaId>()
val internalUser = RemoteConfig.internalUser
@@ -193,6 +215,7 @@ class ArchiveAttachmentReconciliationJob private constructor(
}
}
}
stopwatch.split("mark-reupload")
if (bookkeepingErrorCount > 0) {
Log.w(TAG, "Found that $bookkeepingErrorCount/$mayNeedReUploadCount of the CDN mismatches were bookkeeping errors.", true)
@@ -200,7 +223,8 @@ class ArchiveAttachmentReconciliationJob private constructor(
Log.i(TAG, "None of the $mayNeedReUploadCount CDN mismatches were bookkeeping errors.", true)
}
Log.d(TAG, "AFTER:\n" + SignalDatabase.attachments.debugGetAttachmentStats().prettyString(), true)
Log.d(TAG, "AFTER:\n" + SignalDatabase.attachments.debugGetAttachmentStats().shortPrettyString(), true)
stopwatch.split("stats-after")
if (internalUser && mediaIdsThatNeedUpload.isNotEmpty()) {
Log.w(TAG, "Starting internal-only lookup of matching attachments. May take a while!", true)
@@ -217,6 +241,7 @@ class ArchiveAttachmentReconciliationJob private constructor(
Log.w(TAG, "[Fullsize] Needed Upload: attachmentId=${attachment.attachmentId}, messageId=${attachment.mmsId}, contentType=${attachment.contentType}, quote=${attachment.quote}, transferState=${attachment.transferState}, archiveTransferState=${attachment.archiveTransferState}, hasData=${attachment.hasData}", true)
}
}
stopwatch.split("internal-lookup")
}
if (newBackupJobRequired) {
@@ -240,6 +265,8 @@ class ArchiveAttachmentReconciliationJob private constructor(
SignalStore.backup.remoteStorageGarbageCollectionPending = false
SignalStore.backup.lastAttachmentReconciliationTime = System.currentTimeMillis()
stopwatch.stop(TAG)
return null
}
@@ -249,9 +276,9 @@ class ArchiveAttachmentReconciliationJob private constructor(
* - Fix any CDN mismatches by updating our local store with the correct CDN.
* - Delete any orphaned attachments that are on the CDN but not in our local store.
*
* @return Null if successful, or a [Result] indicating the failure reason.
* @return A list of media objects that should be deleted (after being verified)
*/
private fun syncCdnPage(archivedItemPage: ArchiveGetMediaItemsResponse, currentSnapshotVersion: Long): Result? {
private fun syncCdnPage(archivedItemPage: ArchiveGetMediaItemsResponse, currentSnapshotVersion: Long): Set<ArchivedMediaObject> {
val mediaObjects = archivedItemPage.storedMediaObjects.map {
ArchivedMediaObject(
mediaId = it.mediaId,
@@ -267,11 +294,6 @@ class ArchiveAttachmentReconciliationJob private constructor(
val mediaOnRemoteButNotLocal = SignalDatabase.backupMediaSnapshots.getMediaObjectsThatCantBeFound(mediaObjects)
val mediaObjectsOnBothRemoteAndLocal = mediaObjects - mediaOnRemoteButNotLocal
// TODO [backups] Temporarily remove deletes
// if (RemoteConfig.internalUser && mediaOnRemoteButNotLocal.isNotEmpty()) {
// Log.w(TAG, "MediaIds of items on remote but not local: ${mediaOnRemoteButNotLocal.joinToString(", ") { it.mediaId }}", true)
// }
val cdnMismatches = SignalDatabase.backupMediaSnapshots.getMediaObjectsWithNonMatchingCdn(mediaObjectsOnBothRemoteAndLocal)
if (cdnMismatches.isNotEmpty()) {
Log.w(TAG, "Found ${cdnMismatches.size} items with CDNs that differ from what we have locally. Updating our local store.")
@@ -280,14 +302,7 @@ class ArchiveAttachmentReconciliationJob private constructor(
}
}
// TODO [backups] Temporarily remove deletes
// val deleteResult = ArchiveCommitAttachmentDeletesJob.deleteMediaObjectsFromCdn(TAG, mediaOnRemoteButNotLocal, this::defaultBackoff, this::isCanceled)
// if (deleteResult != null) {
// Log.w(TAG, "Failed to delete orphaned attachments from the CDN. Returning failure.")
// return deleteResult
// }
return null
return mediaOnRemoteButNotLocal
}
/**
@@ -316,6 +331,34 @@ class ArchiveAttachmentReconciliationJob private constructor(
}
}
/**
* Deletes attachments from the archive CDN, after verifying that they also can't be found anywhere in [org.thoughtcrime.securesms.database.AttachmentTable]
* either. Checking the attachment table is very expensive and independent of query size, which is why we batch the lookups.
*
* @return A non-successful [Result] in the case of failure, otherwise null for success.
*/
private fun validateAndDeleteFromRemote(deletes: Set<ArchivedMediaObject>): Result? {
val stopwatch = Stopwatch("remote-delete")
val validatedDeletes = SignalDatabase.attachments.getMediaObjectsThatCantBeFound(deletes)
Log.d(TAG, "Found that ${validatedDeletes.size}/${deletes.size} requested remote deletes were valid based on current attachment table state.")
stopwatch.split("validate")
if (validatedDeletes.isEmpty()) {
return null
}
val deleteResult = ArchiveCommitAttachmentDeletesJob.deleteMediaObjectsFromCdn(TAG, validatedDeletes, this::defaultBackoff, this::isCanceled)
if (deleteResult != null) {
Log.w(TAG, "Failed to delete orphaned attachments from the CDN. Returning failure.")
return deleteResult
}
stopwatch.split("network")
stopwatch.stop(TAG)
return null
}
private fun maybePostReconciliationFailureNotification() {
if (!RemoteConfig.internalUser) {
return