From 9860b990e5be40df9bef5aad4e318d900a876607 Mon Sep 17 00:00:00 2001 From: Greyson Parrelli Date: Thu, 5 Jun 2025 12:29:19 -0400 Subject: [PATCH] Split archive deletes and reconciliations into separate jobs. --- .../securesms/backup/ArchiveUploadProgress.kt | 4 +- .../database/BackupMediaSnapshotTable.kt | 10 +- .../ArchiveAttachmentReconciliationJob.kt | 216 +++++++++++++++ .../jobs/ArchiveCommitAttachmentDeletesJob.kt | 123 +++++++++ .../jobs/BackupMediaSnapshotSyncJob.kt | 252 ------------------ .../securesms/jobs/BackupMessagesJob.kt | 5 +- .../securesms/jobs/JobManagerFactories.java | 4 +- .../securesms/keyvalue/BackupValues.kt | 4 +- .../logsubmit/LogSectionRemoteBackups.kt | 2 +- .../securesms/util/RemoteConfig.kt | 29 ++ app/src/main/protowire/JobData.proto | 4 +- .../signalservice/api/archive/ArchiveApi.kt | 24 +- 12 files changed, 405 insertions(+), 272 deletions(-) create mode 100644 app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveAttachmentReconciliationJob.kt create mode 100644 app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveCommitAttachmentDeletesJob.kt delete mode 100644 app/src/main/java/org/thoughtcrime/securesms/jobs/BackupMediaSnapshotSyncJob.kt diff --git a/app/src/main/java/org/thoughtcrime/securesms/backup/ArchiveUploadProgress.kt b/app/src/main/java/org/thoughtcrime/securesms/backup/ArchiveUploadProgress.kt index 298418d7cf..7377a9a867 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/backup/ArchiveUploadProgress.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/backup/ArchiveUploadProgress.kt @@ -20,6 +20,7 @@ import org.thoughtcrime.securesms.attachments.AttachmentId import org.thoughtcrime.securesms.backup.v2.BackupRepository import org.thoughtcrime.securesms.database.SignalDatabase import org.thoughtcrime.securesms.dependencies.AppDependencies +import org.thoughtcrime.securesms.jobs.ArchiveCommitAttachmentDeletesJob import org.thoughtcrime.securesms.jobs.ArchiveThumbnailUploadJob import org.thoughtcrime.securesms.jobs.BackfillDigestJob import org.thoughtcrime.securesms.jobs.UploadAttachmentToArchiveJob @@ -108,6 +109,7 @@ object ArchiveUploadProgress { } AppDependencies.jobManager.cancelAllInQueue(BackfillDigestJob.QUEUE) + AppDependencies.jobManager.cancelAllInQueue(ArchiveCommitAttachmentDeletesJob.ARCHIVE_ATTACHMENT_QUEUE) UploadAttachmentToArchiveJob.getAllQueueKeys().forEach { AppDependencies.jobManager.cancelAllInQueue(it) } @@ -123,7 +125,7 @@ object ArchiveUploadProgress { Log.d(TAG, "Flushing job manager queue...") AppDependencies.jobManager.flush() - val queues = setOf(BackfillDigestJob.QUEUE, ArchiveThumbnailUploadJob.KEY) + UploadAttachmentToArchiveJob.getAllQueueKeys() + val queues = setOf(BackfillDigestJob.QUEUE, ArchiveThumbnailUploadJob.KEY, ArchiveCommitAttachmentDeletesJob.ARCHIVE_ATTACHMENT_QUEUE) + UploadAttachmentToArchiveJob.getAllQueueKeys() Log.d(TAG, "Waiting for cancelations to occur...") while (!AppDependencies.jobManager.areQueuesEmpty(queues)) { delay(1.seconds) diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/BackupMediaSnapshotTable.kt b/app/src/main/java/org/thoughtcrime/securesms/database/BackupMediaSnapshotTable.kt index 0bb6bff984..747f3b7fe9 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/BackupMediaSnapshotTable.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/BackupMediaSnapshotTable.kt @@ -195,9 +195,9 @@ class BackupMediaSnapshotTable(context: Context, database: SignalDatabase) : Dat * * We purposely allow pending items here -- so long as they were in the most recent complete snapshot, we want to keep them. */ - fun getMediaObjectsThatCantBeFound(objects: List): List { + fun getMediaObjectsThatCantBeFound(objects: List): Set { if (objects.isEmpty()) { - return emptyList() + return emptySet() } val queries: List = SqlUtil.buildCollectionQuery( @@ -220,7 +220,7 @@ class BackupMediaSnapshotTable(context: Context, database: SignalDatabase) : Dat } } - return objects.filterNot { foundObjects.contains(it.mediaId) } + return objects.filterNot { foundObjects.contains(it.mediaId) }.toSet() } /** @@ -268,7 +268,7 @@ class BackupMediaSnapshotTable(context: Context, database: SignalDatabase) : Dat } /** - * Get all media objects who were last seen on the remote server before the given time. + * Get all media objects in specified snapshot who were last seen on the CDN before that snapshot. * This is used to find media objects that have not been seen on the CDN, even though they should be. * * The cursor contains rows that can be parsed into [MediaEntry] objects. @@ -279,7 +279,7 @@ class BackupMediaSnapshotTable(context: Context, database: SignalDatabase) : Dat return readableDatabase .select(MEDIA_ID, CDN, REMOTE_DIGEST, IS_THUMBNAIL) .from(TABLE_NAME) - .where("$LAST_SEEN_ON_REMOTE_SNAPSHOT_VERSION < $snapshotVersion AND $SNAPSHOT_VERSION = $MAX_VERSION") + .where("$LAST_SEEN_ON_REMOTE_SNAPSHOT_VERSION < $snapshotVersion AND $SNAPSHOT_VERSION = $snapshotVersion") .run() } diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveAttachmentReconciliationJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveAttachmentReconciliationJob.kt new file mode 100644 index 0000000000..0c575aa6f5 --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveAttachmentReconciliationJob.kt @@ -0,0 +1,216 @@ +/* + * Copyright 2024 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.thoughtcrime.securesms.jobs + +import org.signal.core.util.forEach +import org.signal.core.util.logging.Log +import org.signal.core.util.nullIfBlank +import org.thoughtcrime.securesms.backup.v2.ArchivedMediaObject +import org.thoughtcrime.securesms.backup.v2.BackupRepository +import org.thoughtcrime.securesms.database.BackupMediaSnapshotTable +import org.thoughtcrime.securesms.database.SignalDatabase +import org.thoughtcrime.securesms.jobmanager.Job +import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint +import org.thoughtcrime.securesms.jobs.protos.ArchiveAttachmentReconciliationJobData +import org.thoughtcrime.securesms.keyvalue.SignalStore +import org.thoughtcrime.securesms.util.RemoteConfig +import org.whispersystems.signalservice.api.NetworkResult +import org.whispersystems.signalservice.api.archive.ArchiveGetMediaItemsResponse +import java.lang.RuntimeException +import kotlin.time.Duration.Companion.days + +/** + * We do our best to keep our local attachments in sync with the archive CDN, but we still want to have a backstop that periodically + * checks to make sure things are in sync, and corrects it if it isn't. + * + * Specifically, this job does three important things: + * + * 1. Ensures that orphaned attachments on the CDN (i.e. attachments that are on the CDN but are no longer tied to the most recent backup) are deleted. + * 2. Ensures that attachments we thought were uploaded to the CDN, but are no longer there, are re-uploaded. + * 3. Keeps the CDN numbers in sync with our local database. There are known situation after the initial restore when we actually don't know the CDN, and after + * the initial restore, there's always the change that something falls out of sync, so may as well check it then as well since we're getting the data anyway. + */ +class ArchiveAttachmentReconciliationJob private constructor( + private var snapshotVersion: Long?, + private var serverCursor: String?, + parameters: Parameters +) : Job(parameters) { + + companion object { + + private val TAG = Log.tag(ArchiveAttachmentReconciliationJob::class) + + const val KEY = "ArchiveAttachmentReconciliationJob" + + private const val CDN_FETCH_LIMIT = 10_000 + } + + constructor() : this( + snapshotVersion = null, + serverCursor = null, + parameters = Parameters.Builder() + .addConstraint(NetworkConstraint.KEY) + .setQueue(ArchiveCommitAttachmentDeletesJob.ARCHIVE_ATTACHMENT_QUEUE) + .setMaxInstancesForQueue(2) + .setMaxAttempts(Parameters.UNLIMITED) + .setLifespan(1.days.inWholeMilliseconds) + .build() + ) + + override fun serialize(): ByteArray = ArchiveAttachmentReconciliationJobData(snapshotVersion, serverCursor ?: "").encode() + + override fun getFactoryKey(): String = KEY + + override fun run(): Result { + val timeSinceLastSync = System.currentTimeMillis() - SignalStore.backup.lastAttachmentReconciliationTime + if (serverCursor == null && timeSinceLastSync > 0 && timeSinceLastSync < RemoteConfig.archiveReconciliationSyncInterval.inWholeMilliseconds) { + Log.d(TAG, "No need to do a remote sync yet. Time since last sync: $timeSinceLastSync ms") + return Result.success() + } + + // It's possible a new backup could be started while this job is running. If we don't keep a consistent view of the snapshot version, the logic + // we use to determine which attachments need to be re-uploaded will possibly result in us unnecessarily re-uploading attachments. + snapshotVersion = snapshotVersion ?: SignalDatabase.backupMediaSnapshots.getCurrentSnapshotVersion() + + return syncDataFromCdn(snapshotVersion!!) ?: Result.success() + } + + override fun onFailure() = Unit + + /** + * Fetches all attachment metadata from the archive CDN and ensures that our local store is in sync with it. + * + * Specifically, we make sure that: + * (1) We delete any attachments from the CDN that we have no knowledge of in any backup. + * (2) We ensure that our local store has the correct CDN for any attachments on the CDN (they should only really fall out of sync when you restore a backup + * that was made before all of the attachments had been uploaded). + */ + private fun syncDataFromCdn(snapshotVersion: Long): Result? { + do { + if (isCanceled) { + Log.w(TAG, "Job cancelled while syncing archived attachments from the CDN.") + return Result.failure() + } + + val (archivedItemPage, jobResult) = getRemoteArchiveItemPage(serverCursor) + if (jobResult != null) { + return jobResult + } + check(archivedItemPage != null) + + syncCdnPage(archivedItemPage, snapshotVersion)?.let { return it } + + serverCursor = archivedItemPage.cursor + } while (serverCursor != null) + + if (isCanceled) { + Log.w(TAG, "Job cancelled while syncing archived attachments from the CDN.") + return Result.failure() + } + + val mediaObjectsThatNeedReUpload = SignalDatabase.backupMediaSnapshots.getMediaObjectsLastSeenOnCdnBeforeSnapshotVersion(snapshotVersion) + val needReUploadCount = mediaObjectsThatNeedReUpload.count + + if (needReUploadCount > 0) { + Log.w(TAG, "Found $needReUploadCount attachments that we thought were uploaded, but could not be found on the CDN. Clearing state and enqueuing uploads.") + + mediaObjectsThatNeedReUpload.forEach { + val entry = BackupMediaSnapshotTable.MediaEntry.fromCursor(it) + // TODO [backup] Re-enqueue thumbnail uploads if necessary + if (!entry.isThumbnail) { + SignalDatabase.attachments.resetArchiveTransferStateByDigest(entry.digest) + } + } + + BackupMessagesJob.enqueue() + } else { + Log.d(TAG, "No attachments need to be repaired.") + } + + SignalStore.backup.lastAttachmentReconciliationTime = System.currentTimeMillis() + + return null + } + + /** + * Given a page of archived media items, this method will: + * - Mark that page as seen on the remote. + * - Fix any CDN mismatches by updating our local store with the correct CDN. + * - Delete any orphaned attachments that are on the CDN but not in our local store. + * + * @return Null if successful, or a [Result] indicating the failure reason. + */ + private fun syncCdnPage(archivedItemPage: ArchiveGetMediaItemsResponse, currentSnapshotVersion: Long): Result? { + val mediaObjects = archivedItemPage.storedMediaObjects.map { + ArchivedMediaObject( + mediaId = it.mediaId, + cdn = it.cdn + ) + } + + SignalDatabase.backupMediaSnapshots.markSeenOnRemote( + mediaIdBatch = mediaObjects.map { it.mediaId }, + snapshotVersion = currentSnapshotVersion + ) + + val mediaOnRemoteButNotLocal = SignalDatabase.backupMediaSnapshots.getMediaObjectsThatCantBeFound(mediaObjects) + val mediaObjectsOnBothRemoteAndLocal = mediaObjects - mediaOnRemoteButNotLocal + + val cdnMismatches = SignalDatabase.backupMediaSnapshots.getMediaObjectsWithNonMatchingCdn(mediaObjectsOnBothRemoteAndLocal) + if (cdnMismatches.isNotEmpty()) { + Log.w(TAG, "Found ${cdnMismatches.size} items with CDNs that differ from what we have locally. Updating our local store.") + for (mismatch in cdnMismatches) { + SignalDatabase.attachments.setArchiveCdnByDigest(mismatch.digest, mismatch.cdn) + } + } + + val deleteResult = ArchiveCommitAttachmentDeletesJob.deleteMediaObjectsFromCdn(TAG, mediaOnRemoteButNotLocal, this::defaultBackoff, this::isCanceled) + if (deleteResult != null) { + Log.w(TAG, "Failed to delete orphaned attachments from the CDN. Returning failure.") + return deleteResult + } + + return null + } + + /** + * Fetches a page of archived media items from the CDN. + * + * @param cursor The cursor to use for pagination, or null to start from the beginning. + * @return The [ArchiveGetMediaItemsResponse] if successful, or null with a [Result] indicating the failure reason. + */ + private fun getRemoteArchiveItemPage(cursor: String?): Pair { + return when (val result = BackupRepository.listRemoteMediaObjects(CDN_FETCH_LIMIT, cursor)) { + is NetworkResult.Success -> result.result to null + is NetworkResult.NetworkError -> return null to Result.retry(defaultBackoff()) + is NetworkResult.StatusCodeError -> { + if (result.code == 429) { + Log.w(TAG, "Rate limited while attempting to list media objects. Retrying later.") + return null to Result.retry(result.retryAfter()?.inWholeMilliseconds ?: defaultBackoff()) + } else { + Log.w(TAG, "Failed to list remote media objects with code: ${result.code}. Unable to proceed.", result.getCause()) + return null to Result.failure() + } + } + is NetworkResult.ApplicationError -> { + Log.w(TAG, "Failed to list remote media objects due to a crash.", result.getCause()) + return null to Result.fatalFailure(RuntimeException(result.getCause())) + } + } + } + + class Factory : Job.Factory { + override fun create(parameters: Parameters, serializedData: ByteArray?): ArchiveAttachmentReconciliationJob { + val data = ArchiveAttachmentReconciliationJobData.ADAPTER.decode(serializedData!!) + + return ArchiveAttachmentReconciliationJob( + snapshotVersion = data.snapshot, + serverCursor = data.serverCursor.nullIfBlank(), + parameters = parameters + ) + } + } +} diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveCommitAttachmentDeletesJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveCommitAttachmentDeletesJob.kt new file mode 100644 index 0000000000..756b9b4c80 --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveCommitAttachmentDeletesJob.kt @@ -0,0 +1,123 @@ +/* + * Copyright 2025 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.thoughtcrime.securesms.jobs + +import org.signal.core.util.logging.Log +import org.thoughtcrime.securesms.backup.v2.ArchivedMediaObject +import org.thoughtcrime.securesms.backup.v2.BackupRepository +import org.thoughtcrime.securesms.database.BackupMediaSnapshotTable +import org.thoughtcrime.securesms.database.SignalDatabase +import org.thoughtcrime.securesms.jobmanager.Job +import org.whispersystems.signalservice.api.NetworkResult +import java.lang.RuntimeException +import kotlin.time.Duration.Companion.days +import kotlin.time.Duration.Companion.hours + +/** + * When we delete media throughout the day, we can't delete it from the archive service right away, or we'd invalidate the last-known snapshot. + * Instead, we have to do it after a backup is taken. This job looks at [BackupMediaSnapshotTable] in order to determine which media objects + * can be safely deleted from the archive service. + */ +class ArchiveCommitAttachmentDeletesJob private constructor(parameters: Parameters) : Job(parameters) { + + companion object { + private val TAG = Log.tag(ArchiveCommitAttachmentDeletesJob::class.java) + + const val KEY = "ArchiveCommitAttachmentDeletesJob" + const val ARCHIVE_ATTACHMENT_QUEUE = "ArchiveAttachmentQueue" + + private const val REMOTE_DELETE_BATCH_SIZE = 1_000 + + /** + * Deletes the provided attachments from the CDN. + * + * @return Null if successful, or a [Result] indicating the failure. + */ + fun deleteMediaObjectsFromCdn(tag: String, attachmentsToDelete: Set, backoffGenerator: () -> Long, cancellationSignal: () -> Boolean): Result? { + attachmentsToDelete.chunked(REMOTE_DELETE_BATCH_SIZE).forEach { chunk -> + if (cancellationSignal()) { + Log.w(tag, "Job cancelled while deleting attachments from the CDN.") + return Result.failure() + } + + when (val result = BackupRepository.deleteAbandonedMediaObjects(chunk)) { + is NetworkResult.Success -> { + Log.i(tag, "Successfully deleted ${chunk.size} attachments off of the CDN.") + } + + is NetworkResult.NetworkError -> { + return Result.retry(backoffGenerator()) + } + + is NetworkResult.StatusCodeError -> { + when (result.code) { + 429 -> { + Log.w(tag, "Rate limited while attempting to delete media objects. Retrying later.") + return Result.retry(result.retryAfter()?.inWholeMilliseconds ?: backoffGenerator()) + } + + in 500..599 -> { + Log.w(tag, "Failed to delete attachments from CDN with code: ${result.code}. Retrying with a larger backoff.", result.getCause()) + return Result.retry(1.hours.inWholeMilliseconds) + } + + else -> { + Log.w(tag, "Failed to delete attachments from CDN with code: ${result.code}. Considering this a terminal failure.", result.getCause()) + return Result.failure() + } + } + } + + is NetworkResult.ApplicationError -> { + Log.w(tag, "Crash when trying to delete attachments from the CDN", result.getCause()) + Result.fatalFailure(RuntimeException(result.getCause())) + } + } + } + + return null + } + } + + constructor() : this( + parameters = Parameters.Builder() + .setQueue(ARCHIVE_ATTACHMENT_QUEUE) + .setMaxInstancesForQueue(1) + .setLifespan(30.days.inWholeMilliseconds) + .setMaxAttempts(Parameters.UNLIMITED) + .build() + ) + + override fun serialize(): ByteArray? = null + + override fun getFactoryKey(): String = KEY + + override fun run(): Result { + var mediaObjects = SignalDatabase.backupMediaSnapshots.getPageOfOldMediaObjects(REMOTE_DELETE_BATCH_SIZE) + + while (mediaObjects.isNotEmpty()) { + if (isCanceled) { + Log.w(TAG, "Job cancelled while processing media objects for deletion.") + return Result.failure() + } + + deleteMediaObjectsFromCdn(TAG, mediaObjects, this::defaultBackoff, this::isCanceled)?.let { result -> return result } + SignalDatabase.backupMediaSnapshots.deleteOldMediaObjects(mediaObjects) + + mediaObjects = SignalDatabase.backupMediaSnapshots.getPageOfOldMediaObjects(REMOTE_DELETE_BATCH_SIZE) + } + + return Result.success() + } + + override fun onFailure() = Unit + + class Factory : Job.Factory { + override fun create(parameters: Parameters, serializedData: ByteArray?): ArchiveCommitAttachmentDeletesJob { + return ArchiveCommitAttachmentDeletesJob(parameters) + } + } +} diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupMediaSnapshotSyncJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupMediaSnapshotSyncJob.kt deleted file mode 100644 index 79de37c4d7..0000000000 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupMediaSnapshotSyncJob.kt +++ /dev/null @@ -1,252 +0,0 @@ -/* - * Copyright 2024 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ - -package org.thoughtcrime.securesms.jobs - -import org.signal.core.util.forEach -import org.signal.core.util.logging.Log -import org.signal.core.util.nullIfBlank -import org.thoughtcrime.securesms.backup.v2.ArchivedMediaObject -import org.thoughtcrime.securesms.backup.v2.BackupRepository -import org.thoughtcrime.securesms.database.BackupMediaSnapshotTable -import org.thoughtcrime.securesms.database.SignalDatabase -import org.thoughtcrime.securesms.dependencies.AppDependencies -import org.thoughtcrime.securesms.jobmanager.Job -import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint -import org.thoughtcrime.securesms.jobs.protos.BackupMediaSnapshotSyncJobData -import org.thoughtcrime.securesms.keyvalue.SignalStore -import org.whispersystems.signalservice.api.NetworkResult -import org.whispersystems.signalservice.api.archive.ArchiveGetMediaItemsResponse -import java.lang.RuntimeException -import kotlin.time.Duration.Companion.days -import kotlin.time.Duration.Companion.hours - -/** - * When we delete attachments locally, we can't immediately delete them from the archive CDN. This is because there is still a backup that exists that - * references that attachment -- at least until a new backup is made. - * - * This job uses data we store locally in [org.thoughtcrime.securesms.database.BackupMediaSnapshotTable] to determine which media objects can be safely - * deleted from the archive CDN, and then deletes them. - */ -class BackupMediaSnapshotSyncJob private constructor( - private val syncTime: Long, - private var serverCursor: String?, - parameters: Parameters -) : Job(parameters) { - - companion object { - - private val TAG = Log.tag(BackupMediaSnapshotSyncJob::class) - - const val KEY = "BackupMediaSnapshotSyncJob" - - private const val REMOTE_DELETE_BATCH_SIZE = 750 - private const val CDN_PAGE_SIZE = 10_000 - private val BACKUP_MEDIA_SYNC_INTERVAL = 7.days.inWholeMilliseconds - - fun enqueue(syncTime: Long) { - AppDependencies.jobManager.add( - BackupMediaSnapshotSyncJob( - syncTime = syncTime, - serverCursor = null, - parameters = Parameters.Builder() - .addConstraint(NetworkConstraint.KEY) - .setQueue("BackupMediaSnapshotSyncJob") - .setMaxAttempts(Parameters.UNLIMITED) - .setLifespan(12.hours.inWholeMilliseconds) - .build() - ) - ) - } - } - - override fun serialize(): ByteArray = BackupMediaSnapshotSyncJobData(syncTime, serverCursor ?: "").encode() - - override fun getFactoryKey(): String = KEY - - override fun run(): Result { - if (serverCursor == null) { - removeLocallyDeletedAttachmentsFromCdn()?.let { result -> return result } - } else { - Log.d(TAG, "Already deleted old attachments from CDN. Skipping to syncing with remote.") - } - - val timeSinceLastRemoteSync = System.currentTimeMillis() - SignalStore.backup.lastMediaSyncTime - if (serverCursor == null && timeSinceLastRemoteSync > 0 && timeSinceLastRemoteSync < BACKUP_MEDIA_SYNC_INTERVAL) { - Log.d(TAG, "No need to do a remote sync yet. Time since last sync: $timeSinceLastRemoteSync ms") - return Result.success() - } - - return syncDataFromCdn() ?: Result.success() - } - - override fun onFailure() = Unit - - /** - * Looks through our local snapshot of what attachments we put in the last backup file, and uses that to delete any old attachments from the archive CDN - * that we no longer need. - */ - private fun removeLocallyDeletedAttachmentsFromCdn(): Result? { - var mediaObjects = SignalDatabase.backupMediaSnapshots.getPageOfOldMediaObjects(REMOTE_DELETE_BATCH_SIZE) - - while (mediaObjects.isNotEmpty()) { - deleteMediaObjectsFromCdn(mediaObjects)?.let { result -> return result } - SignalDatabase.backupMediaSnapshots.deleteOldMediaObjects(mediaObjects) - - mediaObjects = SignalDatabase.backupMediaSnapshots.getPageOfOldMediaObjects(CDN_PAGE_SIZE) - } - - return null - } - - /** - * Fetches all attachment metadata from the archive CDN and ensures that our local store is in sync with it. - * - * Specifically, we make sure that: - * (1) We delete any attachments from the CDN that we have no knowledge of in any backup. - * (2) We ensure that our local store has the correct CDN for any attachments on the CDN (they should only really fall out of sync when you restore a backup - * that was made before all of the attachments had been uploaded). - */ - private fun syncDataFromCdn(): Result? { - val attachmentsToDelete = HashSet() - var cursor: String? = serverCursor - - do { - val (archivedItemPage, jobResult) = getRemoteArchiveItemPage(cursor) - if (jobResult != null) { - return jobResult - } - check(archivedItemPage != null) - - cursor = archivedItemPage.cursor - attachmentsToDelete += syncCdnPage(archivedItemPage) - - if (attachmentsToDelete.size >= REMOTE_DELETE_BATCH_SIZE) { - deleteMediaObjectsFromCdn(attachmentsToDelete)?.let { result -> return result } - attachmentsToDelete.clear() - } - - // We don't persist attachmentsToDelete, so we can only update the persisted serverCursor if there's no pending deletes - if (attachmentsToDelete.isEmpty()) { - serverCursor = archivedItemPage.cursor - } - } while (cursor != null) - - if (attachmentsToDelete.isNotEmpty()) { - deleteMediaObjectsFromCdn(attachmentsToDelete)?.let { result -> return result } - } - - val entriesNeedingRepairCursor = SignalDatabase.backupMediaSnapshots.getMediaObjectsLastSeenOnCdnBeforeSnapshotVersion(syncTime) - val needRepairCount = entriesNeedingRepairCursor.count - - if (needRepairCount > 0) { - Log.w(TAG, "Found $needRepairCount attachments that we thought were uploaded, but could not be found on the CDN. Clearing state and enqueuing uploads.") - - entriesNeedingRepairCursor.forEach { - val entry = BackupMediaSnapshotTable.MediaEntry.fromCursor(it) - // TODO [backup] Re-enqueue thumbnail uploads if necessary - if (!entry.isThumbnail) { - SignalDatabase.attachments.resetArchiveTransferStateByDigest(entry.digest) - } - } - - BackupMessagesJob.enqueue() - } else { - Log.d(TAG, "No attachments need to be repaired.") - } - - SignalStore.backup.lastMediaSyncTime = System.currentTimeMillis() - - return null - } - - /** - * Update CDNs of archived media items. Returns list of objects that don't match - * to a local attachment DB row. - */ - private fun syncCdnPage(archivedItemPage: ArchiveGetMediaItemsResponse): List { - val mediaObjects = archivedItemPage.storedMediaObjects.map { - ArchivedMediaObject( - mediaId = it.mediaId, - cdn = it.cdn - ) - } - - SignalDatabase.backupMediaSnapshots.markSeenOnRemote( - mediaIdBatch = mediaObjects.map { it.mediaId }, - snapshotVersion = syncTime - ) - - val notFoundMediaObjects = SignalDatabase.backupMediaSnapshots.getMediaObjectsThatCantBeFound(mediaObjects) - val remainingObjects = mediaObjects - notFoundMediaObjects - - val cdnMismatches = SignalDatabase.backupMediaSnapshots.getMediaObjectsWithNonMatchingCdn(remainingObjects) - if (cdnMismatches.isNotEmpty()) { - Log.w(TAG, "Found ${cdnMismatches.size} items with CDNs that differ from what we have locally. Updating our local store.") - for (mismatch in cdnMismatches) { - SignalDatabase.attachments.setArchiveCdnByDigest(mismatch.digest, mismatch.cdn) - } - } - - return notFoundMediaObjects - } - - private fun getRemoteArchiveItemPage(cursor: String?): Pair { - return when (val result = BackupRepository.listRemoteMediaObjects(100, cursor)) { - is NetworkResult.Success -> result.result to null - is NetworkResult.NetworkError -> return null to Result.retry(defaultBackoff()) - is NetworkResult.StatusCodeError -> { - if (result.code == 429) { - Log.w(TAG, "Rate limited while attempting to list media objects. Retrying later.") - return null to Result.retry(result.retryAfter()?.inWholeMilliseconds ?: defaultBackoff()) - } else { - Log.w(TAG, "Failed to list remote media objects with code: ${result.code}. Unable to proceed.", result.getCause()) - return null to Result.failure() - } - } - is NetworkResult.ApplicationError -> { - Log.w(TAG, "Failed to list remote media objects due to a crash.", result.getCause()) - return null to Result.fatalFailure(RuntimeException(result.getCause())) - } - } - } - - private fun deleteMediaObjectsFromCdn(attachmentsToDelete: Set): Result? { - when (val result = BackupRepository.deleteAbandonedMediaObjects(attachmentsToDelete)) { - is NetworkResult.Success -> { - Log.i(TAG, "Successfully deleted ${attachmentsToDelete.size} attachments off of the CDN.") - } - is NetworkResult.NetworkError -> { - return Result.retry(defaultBackoff()) - } - is NetworkResult.StatusCodeError -> { - if (result.code == 429) { - Log.w(TAG, "Rate limited while attempting to delete media objects. Retrying later.") - return Result.retry(result.retryAfter()?.inWholeMilliseconds ?: defaultBackoff()) - } else { - Log.w(TAG, "Failed to delete attachments from CDN with code: ${result.code}. Not failing job, just skipping and trying next page.", result.getCause()) - } - } - else -> { - Log.w(TAG, "Crash when trying to delete attachments from the CDN", result.getCause()) - return Result.fatalFailure(RuntimeException(result.getCause())) - } - } - - return null - } - - class Factory : Job.Factory { - override fun create(parameters: Parameters, serializedData: ByteArray?): BackupMediaSnapshotSyncJob { - val data = BackupMediaSnapshotSyncJobData.ADAPTER.decode(serializedData!!) - - return BackupMediaSnapshotSyncJob( - syncTime = data.syncTime, - serverCursor = data.serverCursor.nullIfBlank(), - parameters = parameters - ) - } - } -} diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupMessagesJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupMessagesJob.kt index bce89cd9e1..7921c21804 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupMessagesJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupMessagesJob.kt @@ -194,7 +194,10 @@ class BackupMessagesJob private constructor( SignalStore.backup.clearMessageBackupFailure() SignalDatabase.backupMediaSnapshots.commitPendingRows() - BackupMediaSnapshotSyncJob.enqueue(currentTime) + + AppDependencies.jobManager.add(ArchiveCommitAttachmentDeletesJob()) + AppDependencies.jobManager.add(ArchiveAttachmentReconciliationJob()) + return Result.success() } diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java index 15327ec079..b530c6739d 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java @@ -119,6 +119,8 @@ public final class JobManagerFactories { put(AnalyzeDatabaseJob.KEY, new AnalyzeDatabaseJob.Factory()); put(ApkUpdateJob.KEY, new ApkUpdateJob.Factory()); put(ArchiveAttachmentBackfillJob.KEY, new ArchiveAttachmentBackfillJob.Factory()); + put(ArchiveAttachmentReconciliationJob.KEY, new ArchiveAttachmentReconciliationJob.Factory()); + put(ArchiveCommitAttachmentDeletesJob.KEY, new ArchiveCommitAttachmentDeletesJob.Factory()); put(ArchiveThumbnailUploadJob.KEY, new ArchiveThumbnailUploadJob.Factory()); put(AttachmentCompressionJob.KEY, new AttachmentCompressionJob.Factory()); put(AttachmentCopyJob.KEY, new AttachmentCopyJob.Factory()); @@ -285,7 +287,6 @@ public final class JobManagerFactories { put(BackfillDigestsMigrationJob.KEY, new BackfillDigestsMigrationJob.Factory()); put(BackfillDigestsForDuplicatesMigrationJob.KEY, new BackfillDigestsForDuplicatesMigrationJob.Factory()); put(BackupJitterMigrationJob.KEY, new BackupJitterMigrationJob.Factory()); - put(BackupMediaSnapshotSyncJob.KEY, new BackupMediaSnapshotSyncJob.Factory()); put(BackupNotificationMigrationJob.KEY, new BackupNotificationMigrationJob.Factory()); put(BackupRefreshJob.KEY, new BackupRefreshJob.Factory()); put(BadE164MigrationJob.KEY, new BadE164MigrationJob.Factory()); @@ -390,6 +391,7 @@ public final class JobManagerFactories { put("SendGiftJob", new FailingJob.Factory()); put("InactiveGroupCheckMigrationJob", new PassingMigrationJob.Factory()); put("AttachmentMarkUploadedJob", new FailingJob.Factory()); + put("BackupMediaSnapshotSyncJob", new FailingJob.Factory()); }}; } diff --git a/app/src/main/java/org/thoughtcrime/securesms/keyvalue/BackupValues.kt b/app/src/main/java/org/thoughtcrime/securesms/keyvalue/BackupValues.kt index f2584a5033..65ced3cb97 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/keyvalue/BackupValues.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/keyvalue/BackupValues.kt @@ -45,7 +45,7 @@ class BackupValues(store: KeyValueStore) : SignalStoreValues(store) { private const val KEY_NEXT_BACKUP_TIME = "backup.nextBackupTime" private const val KEY_LAST_BACKUP_TIME = "backup.lastBackupTime" - private const val KEY_LAST_BACKUP_MEDIA_SYNC_TIME = "backup.lastBackupMediaSyncTime" + private const val KEY_LAST_ATTACHMENT_RECONCILIATION_TIME = "backup.lastBackupMediaSyncTime" private const val KEY_TOTAL_RESTORABLE_ATTACHMENT_SIZE = "backup.totalRestorableAttachmentSize" private const val KEY_BACKUP_FREQUENCY = "backup.backupFrequency" @@ -112,7 +112,7 @@ class BackupValues(store: KeyValueStore) : SignalStoreValues(store) { val daysSinceLastBackup: Int get() = (System.currentTimeMillis().milliseconds - lastBackupTime.milliseconds).inWholeDays.toInt() - var lastMediaSyncTime: Long by longValue(KEY_LAST_BACKUP_MEDIA_SYNC_TIME, -1) + var lastAttachmentReconciliationTime: Long by longValue(KEY_LAST_ATTACHMENT_RECONCILIATION_TIME, -1) var backupFrequency: BackupFrequency by enumValue(KEY_BACKUP_FREQUENCY, BackupFrequency.DAILY, BackupFrequency.Serializer) var userManuallySkippedMediaRestore: Boolean by booleanValue(KEY_USER_MANUALLY_SKIPPED_MEDIA_RESTORE, false) diff --git a/app/src/main/java/org/thoughtcrime/securesms/logsubmit/LogSectionRemoteBackups.kt b/app/src/main/java/org/thoughtcrime/securesms/logsubmit/LogSectionRemoteBackups.kt index 526320c282..773605cf1a 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/logsubmit/LogSectionRemoteBackups.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/logsubmit/LogSectionRemoteBackups.kt @@ -26,7 +26,7 @@ class LogSectionRemoteBackups : LogSection { output.append("Latest tier: ${SignalStore.backup.latestBackupTier}\n") output.append("Last backup time: ${SignalStore.backup.lastBackupTime}\n") output.append("Last check-in: ${SignalStore.backup.lastCheckInMillis}\n") - output.append("Last media sync: ${SignalStore.backup.lastMediaSyncTime}\n") + output.append("Last media sync: ${SignalStore.backup.lastAttachmentReconciliationTime}\n") output.append("Days since last backup: ${SignalStore.backup.daysSinceLastBackup}\n") output.append("User manually skipped media restore: ${SignalStore.backup.userManuallySkippedMediaRestore}\n") output.append("Can backup with cellular: ${SignalStore.backup.backupWithCellular}\n") diff --git a/app/src/main/java/org/thoughtcrime/securesms/util/RemoteConfig.kt b/app/src/main/java/org/thoughtcrime/securesms/util/RemoteConfig.kt index f6aedb6ffd..fcf90c3d70 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/util/RemoteConfig.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/util/RemoteConfig.kt @@ -30,10 +30,13 @@ import kotlin.concurrent.withLock import kotlin.math.max import kotlin.math.min import kotlin.reflect.KProperty +import kotlin.time.Duration import kotlin.time.Duration.Companion.days import kotlin.time.Duration.Companion.hours import kotlin.time.Duration.Companion.minutes import kotlin.time.Duration.Companion.seconds +import kotlin.time.DurationUnit +import kotlin.time.toDuration /** * A location for accessing remotely-configured values. @@ -428,6 +431,24 @@ object RemoteConfig { ) } + private fun remoteDuration( + key: String, + defaultValue: Duration, + hotSwappable: Boolean, + durationUnit: DurationUnit, + active: Boolean = true, + onChangeListener: OnFlagChange? = null + ): Config { + return remoteValue( + key = key, + hotSwappable = hotSwappable, + sticky = false, + active = active, + onChangeListener = onChangeListener, + transformer = { it?.toString()?.toLongOrNull()?.toDuration(durationUnit) ?: defaultValue } + ) + } + private fun remoteString( key: String, defaultValue: T, @@ -1125,5 +1146,13 @@ object RemoteConfig { inSeconds.seconds.inWholeMilliseconds } + @JvmStatic + val archiveReconciliationSyncInterval: Duration by remoteDuration( + key = "android.archiveReconciliationSyncInterval", + defaultValue = 7.days, + hotSwappable = true, + durationUnit = DurationUnit.DAYS + ) + // endregion } diff --git a/app/src/main/protowire/JobData.proto b/app/src/main/protowire/JobData.proto index 2818a5c97b..d8a133e735 100644 --- a/app/src/main/protowire/JobData.proto +++ b/app/src/main/protowire/JobData.proto @@ -141,8 +141,8 @@ message UploadAttachmentToArchiveJobData { optional bool canReuseUpload = 3; } -message BackupMediaSnapshotSyncJobData { - uint64 syncTime = 1; +message ArchiveAttachmentReconciliationJobData { + optional uint64 snapshot = 1; string serverCursor = 2; } diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/archive/ArchiveApi.kt b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/archive/ArchiveApi.kt index c45faee718..0e917cce8c 100644 --- a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/archive/ArchiveApi.kt +++ b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/archive/ArchiveApi.kt @@ -276,6 +276,14 @@ class ArchiveApi( /** * Retrieves a page of media items in the user's archive. + * + * GET /v1/archives/media?limit={limit}&cursor={cursor} + * + * - 200: Success + * - 400: Bad request, or made on authenticated channel + * - 403: Forbidden + * - 429: Rate-limited + * * @param limit The maximum number of items to return. * @param cursor A token that can be read from your previous response, telling the server where to start the next page. */ @@ -291,13 +299,15 @@ class ArchiveApi( /** * Copy and re-encrypt media from the attachments cdn into the backup cdn. * - * Possible errors: - * 400: Bad arguments, or made on an authenticated channel - * 401: Invalid presentation or signature - * 403: Insufficient permissions - * 410: The source object was not found - * 413: No media space remaining - * 429: Rate-limited + * PUT /v1/archives/media + * + * - 200: Success + * - 400: Bad arguments, or made on an authenticated channel + * - 401: Invalid presentation or signature + * - 403: Insufficient permissions + * - 410: The source object was not found + * - 413: No media space remaining + * - 429: Rate-limited */ fun copyAttachmentToArchive( aci: ACI,