Split archive deletes and reconciliations into separate jobs.

This commit is contained in:
Greyson Parrelli
2025-06-05 12:29:19 -04:00
parent e7115a3a71
commit 9860b990e5
12 changed files with 405 additions and 272 deletions

View File

@@ -20,6 +20,7 @@ import org.thoughtcrime.securesms.attachments.AttachmentId
import org.thoughtcrime.securesms.backup.v2.BackupRepository
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.dependencies.AppDependencies
import org.thoughtcrime.securesms.jobs.ArchiveCommitAttachmentDeletesJob
import org.thoughtcrime.securesms.jobs.ArchiveThumbnailUploadJob
import org.thoughtcrime.securesms.jobs.BackfillDigestJob
import org.thoughtcrime.securesms.jobs.UploadAttachmentToArchiveJob
@@ -108,6 +109,7 @@ object ArchiveUploadProgress {
}
AppDependencies.jobManager.cancelAllInQueue(BackfillDigestJob.QUEUE)
AppDependencies.jobManager.cancelAllInQueue(ArchiveCommitAttachmentDeletesJob.ARCHIVE_ATTACHMENT_QUEUE)
UploadAttachmentToArchiveJob.getAllQueueKeys().forEach {
AppDependencies.jobManager.cancelAllInQueue(it)
}
@@ -123,7 +125,7 @@ object ArchiveUploadProgress {
Log.d(TAG, "Flushing job manager queue...")
AppDependencies.jobManager.flush()
val queues = setOf(BackfillDigestJob.QUEUE, ArchiveThumbnailUploadJob.KEY) + UploadAttachmentToArchiveJob.getAllQueueKeys()
val queues = setOf(BackfillDigestJob.QUEUE, ArchiveThumbnailUploadJob.KEY, ArchiveCommitAttachmentDeletesJob.ARCHIVE_ATTACHMENT_QUEUE) + UploadAttachmentToArchiveJob.getAllQueueKeys()
Log.d(TAG, "Waiting for cancelations to occur...")
while (!AppDependencies.jobManager.areQueuesEmpty(queues)) {
delay(1.seconds)

View File

@@ -195,9 +195,9 @@ class BackupMediaSnapshotTable(context: Context, database: SignalDatabase) : Dat
*
* We purposely allow pending items here -- so long as they were in the most recent complete snapshot, we want to keep them.
*/
fun getMediaObjectsThatCantBeFound(objects: List<ArchivedMediaObject>): List<ArchivedMediaObject> {
fun getMediaObjectsThatCantBeFound(objects: List<ArchivedMediaObject>): Set<ArchivedMediaObject> {
if (objects.isEmpty()) {
return emptyList()
return emptySet()
}
val queries: List<SqlUtil.Query> = SqlUtil.buildCollectionQuery(
@@ -220,7 +220,7 @@ class BackupMediaSnapshotTable(context: Context, database: SignalDatabase) : Dat
}
}
return objects.filterNot { foundObjects.contains(it.mediaId) }
return objects.filterNot { foundObjects.contains(it.mediaId) }.toSet()
}
/**
@@ -268,7 +268,7 @@ class BackupMediaSnapshotTable(context: Context, database: SignalDatabase) : Dat
}
/**
* Get all media objects who were last seen on the remote server before the given time.
* Get all media objects in specified snapshot who were last seen on the CDN before that snapshot.
* This is used to find media objects that have not been seen on the CDN, even though they should be.
*
* The cursor contains rows that can be parsed into [MediaEntry] objects.
@@ -279,7 +279,7 @@ class BackupMediaSnapshotTable(context: Context, database: SignalDatabase) : Dat
return readableDatabase
.select(MEDIA_ID, CDN, REMOTE_DIGEST, IS_THUMBNAIL)
.from(TABLE_NAME)
.where("$LAST_SEEN_ON_REMOTE_SNAPSHOT_VERSION < $snapshotVersion AND $SNAPSHOT_VERSION = $MAX_VERSION")
.where("$LAST_SEEN_ON_REMOTE_SNAPSHOT_VERSION < $snapshotVersion AND $SNAPSHOT_VERSION = $snapshotVersion")
.run()
}

View File

@@ -0,0 +1,216 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.jobs
import org.signal.core.util.forEach
import org.signal.core.util.logging.Log
import org.signal.core.util.nullIfBlank
import org.thoughtcrime.securesms.backup.v2.ArchivedMediaObject
import org.thoughtcrime.securesms.backup.v2.BackupRepository
import org.thoughtcrime.securesms.database.BackupMediaSnapshotTable
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint
import org.thoughtcrime.securesms.jobs.protos.ArchiveAttachmentReconciliationJobData
import org.thoughtcrime.securesms.keyvalue.SignalStore
import org.thoughtcrime.securesms.util.RemoteConfig
import org.whispersystems.signalservice.api.NetworkResult
import org.whispersystems.signalservice.api.archive.ArchiveGetMediaItemsResponse
import java.lang.RuntimeException
import kotlin.time.Duration.Companion.days
/**
* We do our best to keep our local attachments in sync with the archive CDN, but we still want to have a backstop that periodically
* checks to make sure things are in sync, and corrects it if it isn't.
*
* Specifically, this job does three important things:
*
* 1. Ensures that orphaned attachments on the CDN (i.e. attachments that are on the CDN but are no longer tied to the most recent backup) are deleted.
* 2. Ensures that attachments we thought were uploaded to the CDN, but are no longer there, are re-uploaded.
* 3. Keeps the CDN numbers in sync with our local database. There are known situation after the initial restore when we actually don't know the CDN, and after
* the initial restore, there's always the change that something falls out of sync, so may as well check it then as well since we're getting the data anyway.
*/
class ArchiveAttachmentReconciliationJob private constructor(
private var snapshotVersion: Long?,
private var serverCursor: String?,
parameters: Parameters
) : Job(parameters) {
companion object {
private val TAG = Log.tag(ArchiveAttachmentReconciliationJob::class)
const val KEY = "ArchiveAttachmentReconciliationJob"
private const val CDN_FETCH_LIMIT = 10_000
}
constructor() : this(
snapshotVersion = null,
serverCursor = null,
parameters = Parameters.Builder()
.addConstraint(NetworkConstraint.KEY)
.setQueue(ArchiveCommitAttachmentDeletesJob.ARCHIVE_ATTACHMENT_QUEUE)
.setMaxInstancesForQueue(2)
.setMaxAttempts(Parameters.UNLIMITED)
.setLifespan(1.days.inWholeMilliseconds)
.build()
)
override fun serialize(): ByteArray = ArchiveAttachmentReconciliationJobData(snapshotVersion, serverCursor ?: "").encode()
override fun getFactoryKey(): String = KEY
override fun run(): Result {
val timeSinceLastSync = System.currentTimeMillis() - SignalStore.backup.lastAttachmentReconciliationTime
if (serverCursor == null && timeSinceLastSync > 0 && timeSinceLastSync < RemoteConfig.archiveReconciliationSyncInterval.inWholeMilliseconds) {
Log.d(TAG, "No need to do a remote sync yet. Time since last sync: $timeSinceLastSync ms")
return Result.success()
}
// It's possible a new backup could be started while this job is running. If we don't keep a consistent view of the snapshot version, the logic
// we use to determine which attachments need to be re-uploaded will possibly result in us unnecessarily re-uploading attachments.
snapshotVersion = snapshotVersion ?: SignalDatabase.backupMediaSnapshots.getCurrentSnapshotVersion()
return syncDataFromCdn(snapshotVersion!!) ?: Result.success()
}
override fun onFailure() = Unit
/**
* Fetches all attachment metadata from the archive CDN and ensures that our local store is in sync with it.
*
* Specifically, we make sure that:
* (1) We delete any attachments from the CDN that we have no knowledge of in any backup.
* (2) We ensure that our local store has the correct CDN for any attachments on the CDN (they should only really fall out of sync when you restore a backup
* that was made before all of the attachments had been uploaded).
*/
private fun syncDataFromCdn(snapshotVersion: Long): Result? {
do {
if (isCanceled) {
Log.w(TAG, "Job cancelled while syncing archived attachments from the CDN.")
return Result.failure()
}
val (archivedItemPage, jobResult) = getRemoteArchiveItemPage(serverCursor)
if (jobResult != null) {
return jobResult
}
check(archivedItemPage != null)
syncCdnPage(archivedItemPage, snapshotVersion)?.let { return it }
serverCursor = archivedItemPage.cursor
} while (serverCursor != null)
if (isCanceled) {
Log.w(TAG, "Job cancelled while syncing archived attachments from the CDN.")
return Result.failure()
}
val mediaObjectsThatNeedReUpload = SignalDatabase.backupMediaSnapshots.getMediaObjectsLastSeenOnCdnBeforeSnapshotVersion(snapshotVersion)
val needReUploadCount = mediaObjectsThatNeedReUpload.count
if (needReUploadCount > 0) {
Log.w(TAG, "Found $needReUploadCount attachments that we thought were uploaded, but could not be found on the CDN. Clearing state and enqueuing uploads.")
mediaObjectsThatNeedReUpload.forEach {
val entry = BackupMediaSnapshotTable.MediaEntry.fromCursor(it)
// TODO [backup] Re-enqueue thumbnail uploads if necessary
if (!entry.isThumbnail) {
SignalDatabase.attachments.resetArchiveTransferStateByDigest(entry.digest)
}
}
BackupMessagesJob.enqueue()
} else {
Log.d(TAG, "No attachments need to be repaired.")
}
SignalStore.backup.lastAttachmentReconciliationTime = System.currentTimeMillis()
return null
}
/**
* Given a page of archived media items, this method will:
* - Mark that page as seen on the remote.
* - Fix any CDN mismatches by updating our local store with the correct CDN.
* - Delete any orphaned attachments that are on the CDN but not in our local store.
*
* @return Null if successful, or a [Result] indicating the failure reason.
*/
private fun syncCdnPage(archivedItemPage: ArchiveGetMediaItemsResponse, currentSnapshotVersion: Long): Result? {
val mediaObjects = archivedItemPage.storedMediaObjects.map {
ArchivedMediaObject(
mediaId = it.mediaId,
cdn = it.cdn
)
}
SignalDatabase.backupMediaSnapshots.markSeenOnRemote(
mediaIdBatch = mediaObjects.map { it.mediaId },
snapshotVersion = currentSnapshotVersion
)
val mediaOnRemoteButNotLocal = SignalDatabase.backupMediaSnapshots.getMediaObjectsThatCantBeFound(mediaObjects)
val mediaObjectsOnBothRemoteAndLocal = mediaObjects - mediaOnRemoteButNotLocal
val cdnMismatches = SignalDatabase.backupMediaSnapshots.getMediaObjectsWithNonMatchingCdn(mediaObjectsOnBothRemoteAndLocal)
if (cdnMismatches.isNotEmpty()) {
Log.w(TAG, "Found ${cdnMismatches.size} items with CDNs that differ from what we have locally. Updating our local store.")
for (mismatch in cdnMismatches) {
SignalDatabase.attachments.setArchiveCdnByDigest(mismatch.digest, mismatch.cdn)
}
}
val deleteResult = ArchiveCommitAttachmentDeletesJob.deleteMediaObjectsFromCdn(TAG, mediaOnRemoteButNotLocal, this::defaultBackoff, this::isCanceled)
if (deleteResult != null) {
Log.w(TAG, "Failed to delete orphaned attachments from the CDN. Returning failure.")
return deleteResult
}
return null
}
/**
* Fetches a page of archived media items from the CDN.
*
* @param cursor The cursor to use for pagination, or null to start from the beginning.
* @return The [ArchiveGetMediaItemsResponse] if successful, or null with a [Result] indicating the failure reason.
*/
private fun getRemoteArchiveItemPage(cursor: String?): Pair<ArchiveGetMediaItemsResponse?, Result?> {
return when (val result = BackupRepository.listRemoteMediaObjects(CDN_FETCH_LIMIT, cursor)) {
is NetworkResult.Success -> result.result to null
is NetworkResult.NetworkError -> return null to Result.retry(defaultBackoff())
is NetworkResult.StatusCodeError -> {
if (result.code == 429) {
Log.w(TAG, "Rate limited while attempting to list media objects. Retrying later.")
return null to Result.retry(result.retryAfter()?.inWholeMilliseconds ?: defaultBackoff())
} else {
Log.w(TAG, "Failed to list remote media objects with code: ${result.code}. Unable to proceed.", result.getCause())
return null to Result.failure()
}
}
is NetworkResult.ApplicationError -> {
Log.w(TAG, "Failed to list remote media objects due to a crash.", result.getCause())
return null to Result.fatalFailure(RuntimeException(result.getCause()))
}
}
}
class Factory : Job.Factory<ArchiveAttachmentReconciliationJob> {
override fun create(parameters: Parameters, serializedData: ByteArray?): ArchiveAttachmentReconciliationJob {
val data = ArchiveAttachmentReconciliationJobData.ADAPTER.decode(serializedData!!)
return ArchiveAttachmentReconciliationJob(
snapshotVersion = data.snapshot,
serverCursor = data.serverCursor.nullIfBlank(),
parameters = parameters
)
}
}
}

View File

@@ -0,0 +1,123 @@
/*
* Copyright 2025 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.jobs
import org.signal.core.util.logging.Log
import org.thoughtcrime.securesms.backup.v2.ArchivedMediaObject
import org.thoughtcrime.securesms.backup.v2.BackupRepository
import org.thoughtcrime.securesms.database.BackupMediaSnapshotTable
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.jobmanager.Job
import org.whispersystems.signalservice.api.NetworkResult
import java.lang.RuntimeException
import kotlin.time.Duration.Companion.days
import kotlin.time.Duration.Companion.hours
/**
* When we delete media throughout the day, we can't delete it from the archive service right away, or we'd invalidate the last-known snapshot.
* Instead, we have to do it after a backup is taken. This job looks at [BackupMediaSnapshotTable] in order to determine which media objects
* can be safely deleted from the archive service.
*/
class ArchiveCommitAttachmentDeletesJob private constructor(parameters: Parameters) : Job(parameters) {
companion object {
private val TAG = Log.tag(ArchiveCommitAttachmentDeletesJob::class.java)
const val KEY = "ArchiveCommitAttachmentDeletesJob"
const val ARCHIVE_ATTACHMENT_QUEUE = "ArchiveAttachmentQueue"
private const val REMOTE_DELETE_BATCH_SIZE = 1_000
/**
* Deletes the provided attachments from the CDN.
*
* @return Null if successful, or a [Result] indicating the failure.
*/
fun deleteMediaObjectsFromCdn(tag: String, attachmentsToDelete: Set<ArchivedMediaObject>, backoffGenerator: () -> Long, cancellationSignal: () -> Boolean): Result? {
attachmentsToDelete.chunked(REMOTE_DELETE_BATCH_SIZE).forEach { chunk ->
if (cancellationSignal()) {
Log.w(tag, "Job cancelled while deleting attachments from the CDN.")
return Result.failure()
}
when (val result = BackupRepository.deleteAbandonedMediaObjects(chunk)) {
is NetworkResult.Success -> {
Log.i(tag, "Successfully deleted ${chunk.size} attachments off of the CDN.")
}
is NetworkResult.NetworkError -> {
return Result.retry(backoffGenerator())
}
is NetworkResult.StatusCodeError -> {
when (result.code) {
429 -> {
Log.w(tag, "Rate limited while attempting to delete media objects. Retrying later.")
return Result.retry(result.retryAfter()?.inWholeMilliseconds ?: backoffGenerator())
}
in 500..599 -> {
Log.w(tag, "Failed to delete attachments from CDN with code: ${result.code}. Retrying with a larger backoff.", result.getCause())
return Result.retry(1.hours.inWholeMilliseconds)
}
else -> {
Log.w(tag, "Failed to delete attachments from CDN with code: ${result.code}. Considering this a terminal failure.", result.getCause())
return Result.failure()
}
}
}
is NetworkResult.ApplicationError -> {
Log.w(tag, "Crash when trying to delete attachments from the CDN", result.getCause())
Result.fatalFailure(RuntimeException(result.getCause()))
}
}
}
return null
}
}
constructor() : this(
parameters = Parameters.Builder()
.setQueue(ARCHIVE_ATTACHMENT_QUEUE)
.setMaxInstancesForQueue(1)
.setLifespan(30.days.inWholeMilliseconds)
.setMaxAttempts(Parameters.UNLIMITED)
.build()
)
override fun serialize(): ByteArray? = null
override fun getFactoryKey(): String = KEY
override fun run(): Result {
var mediaObjects = SignalDatabase.backupMediaSnapshots.getPageOfOldMediaObjects(REMOTE_DELETE_BATCH_SIZE)
while (mediaObjects.isNotEmpty()) {
if (isCanceled) {
Log.w(TAG, "Job cancelled while processing media objects for deletion.")
return Result.failure()
}
deleteMediaObjectsFromCdn(TAG, mediaObjects, this::defaultBackoff, this::isCanceled)?.let { result -> return result }
SignalDatabase.backupMediaSnapshots.deleteOldMediaObjects(mediaObjects)
mediaObjects = SignalDatabase.backupMediaSnapshots.getPageOfOldMediaObjects(REMOTE_DELETE_BATCH_SIZE)
}
return Result.success()
}
override fun onFailure() = Unit
class Factory : Job.Factory<ArchiveCommitAttachmentDeletesJob> {
override fun create(parameters: Parameters, serializedData: ByteArray?): ArchiveCommitAttachmentDeletesJob {
return ArchiveCommitAttachmentDeletesJob(parameters)
}
}
}

View File

@@ -1,252 +0,0 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.jobs
import org.signal.core.util.forEach
import org.signal.core.util.logging.Log
import org.signal.core.util.nullIfBlank
import org.thoughtcrime.securesms.backup.v2.ArchivedMediaObject
import org.thoughtcrime.securesms.backup.v2.BackupRepository
import org.thoughtcrime.securesms.database.BackupMediaSnapshotTable
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.dependencies.AppDependencies
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint
import org.thoughtcrime.securesms.jobs.protos.BackupMediaSnapshotSyncJobData
import org.thoughtcrime.securesms.keyvalue.SignalStore
import org.whispersystems.signalservice.api.NetworkResult
import org.whispersystems.signalservice.api.archive.ArchiveGetMediaItemsResponse
import java.lang.RuntimeException
import kotlin.time.Duration.Companion.days
import kotlin.time.Duration.Companion.hours
/**
* When we delete attachments locally, we can't immediately delete them from the archive CDN. This is because there is still a backup that exists that
* references that attachment -- at least until a new backup is made.
*
* This job uses data we store locally in [org.thoughtcrime.securesms.database.BackupMediaSnapshotTable] to determine which media objects can be safely
* deleted from the archive CDN, and then deletes them.
*/
class BackupMediaSnapshotSyncJob private constructor(
private val syncTime: Long,
private var serverCursor: String?,
parameters: Parameters
) : Job(parameters) {
companion object {
private val TAG = Log.tag(BackupMediaSnapshotSyncJob::class)
const val KEY = "BackupMediaSnapshotSyncJob"
private const val REMOTE_DELETE_BATCH_SIZE = 750
private const val CDN_PAGE_SIZE = 10_000
private val BACKUP_MEDIA_SYNC_INTERVAL = 7.days.inWholeMilliseconds
fun enqueue(syncTime: Long) {
AppDependencies.jobManager.add(
BackupMediaSnapshotSyncJob(
syncTime = syncTime,
serverCursor = null,
parameters = Parameters.Builder()
.addConstraint(NetworkConstraint.KEY)
.setQueue("BackupMediaSnapshotSyncJob")
.setMaxAttempts(Parameters.UNLIMITED)
.setLifespan(12.hours.inWholeMilliseconds)
.build()
)
)
}
}
override fun serialize(): ByteArray = BackupMediaSnapshotSyncJobData(syncTime, serverCursor ?: "").encode()
override fun getFactoryKey(): String = KEY
override fun run(): Result {
if (serverCursor == null) {
removeLocallyDeletedAttachmentsFromCdn()?.let { result -> return result }
} else {
Log.d(TAG, "Already deleted old attachments from CDN. Skipping to syncing with remote.")
}
val timeSinceLastRemoteSync = System.currentTimeMillis() - SignalStore.backup.lastMediaSyncTime
if (serverCursor == null && timeSinceLastRemoteSync > 0 && timeSinceLastRemoteSync < BACKUP_MEDIA_SYNC_INTERVAL) {
Log.d(TAG, "No need to do a remote sync yet. Time since last sync: $timeSinceLastRemoteSync ms")
return Result.success()
}
return syncDataFromCdn() ?: Result.success()
}
override fun onFailure() = Unit
/**
* Looks through our local snapshot of what attachments we put in the last backup file, and uses that to delete any old attachments from the archive CDN
* that we no longer need.
*/
private fun removeLocallyDeletedAttachmentsFromCdn(): Result? {
var mediaObjects = SignalDatabase.backupMediaSnapshots.getPageOfOldMediaObjects(REMOTE_DELETE_BATCH_SIZE)
while (mediaObjects.isNotEmpty()) {
deleteMediaObjectsFromCdn(mediaObjects)?.let { result -> return result }
SignalDatabase.backupMediaSnapshots.deleteOldMediaObjects(mediaObjects)
mediaObjects = SignalDatabase.backupMediaSnapshots.getPageOfOldMediaObjects(CDN_PAGE_SIZE)
}
return null
}
/**
* Fetches all attachment metadata from the archive CDN and ensures that our local store is in sync with it.
*
* Specifically, we make sure that:
* (1) We delete any attachments from the CDN that we have no knowledge of in any backup.
* (2) We ensure that our local store has the correct CDN for any attachments on the CDN (they should only really fall out of sync when you restore a backup
* that was made before all of the attachments had been uploaded).
*/
private fun syncDataFromCdn(): Result? {
val attachmentsToDelete = HashSet<ArchivedMediaObject>()
var cursor: String? = serverCursor
do {
val (archivedItemPage, jobResult) = getRemoteArchiveItemPage(cursor)
if (jobResult != null) {
return jobResult
}
check(archivedItemPage != null)
cursor = archivedItemPage.cursor
attachmentsToDelete += syncCdnPage(archivedItemPage)
if (attachmentsToDelete.size >= REMOTE_DELETE_BATCH_SIZE) {
deleteMediaObjectsFromCdn(attachmentsToDelete)?.let { result -> return result }
attachmentsToDelete.clear()
}
// We don't persist attachmentsToDelete, so we can only update the persisted serverCursor if there's no pending deletes
if (attachmentsToDelete.isEmpty()) {
serverCursor = archivedItemPage.cursor
}
} while (cursor != null)
if (attachmentsToDelete.isNotEmpty()) {
deleteMediaObjectsFromCdn(attachmentsToDelete)?.let { result -> return result }
}
val entriesNeedingRepairCursor = SignalDatabase.backupMediaSnapshots.getMediaObjectsLastSeenOnCdnBeforeSnapshotVersion(syncTime)
val needRepairCount = entriesNeedingRepairCursor.count
if (needRepairCount > 0) {
Log.w(TAG, "Found $needRepairCount attachments that we thought were uploaded, but could not be found on the CDN. Clearing state and enqueuing uploads.")
entriesNeedingRepairCursor.forEach {
val entry = BackupMediaSnapshotTable.MediaEntry.fromCursor(it)
// TODO [backup] Re-enqueue thumbnail uploads if necessary
if (!entry.isThumbnail) {
SignalDatabase.attachments.resetArchiveTransferStateByDigest(entry.digest)
}
}
BackupMessagesJob.enqueue()
} else {
Log.d(TAG, "No attachments need to be repaired.")
}
SignalStore.backup.lastMediaSyncTime = System.currentTimeMillis()
return null
}
/**
* Update CDNs of archived media items. Returns list of objects that don't match
* to a local attachment DB row.
*/
private fun syncCdnPage(archivedItemPage: ArchiveGetMediaItemsResponse): List<ArchivedMediaObject> {
val mediaObjects = archivedItemPage.storedMediaObjects.map {
ArchivedMediaObject(
mediaId = it.mediaId,
cdn = it.cdn
)
}
SignalDatabase.backupMediaSnapshots.markSeenOnRemote(
mediaIdBatch = mediaObjects.map { it.mediaId },
snapshotVersion = syncTime
)
val notFoundMediaObjects = SignalDatabase.backupMediaSnapshots.getMediaObjectsThatCantBeFound(mediaObjects)
val remainingObjects = mediaObjects - notFoundMediaObjects
val cdnMismatches = SignalDatabase.backupMediaSnapshots.getMediaObjectsWithNonMatchingCdn(remainingObjects)
if (cdnMismatches.isNotEmpty()) {
Log.w(TAG, "Found ${cdnMismatches.size} items with CDNs that differ from what we have locally. Updating our local store.")
for (mismatch in cdnMismatches) {
SignalDatabase.attachments.setArchiveCdnByDigest(mismatch.digest, mismatch.cdn)
}
}
return notFoundMediaObjects
}
private fun getRemoteArchiveItemPage(cursor: String?): Pair<ArchiveGetMediaItemsResponse?, Result?> {
return when (val result = BackupRepository.listRemoteMediaObjects(100, cursor)) {
is NetworkResult.Success -> result.result to null
is NetworkResult.NetworkError -> return null to Result.retry(defaultBackoff())
is NetworkResult.StatusCodeError -> {
if (result.code == 429) {
Log.w(TAG, "Rate limited while attempting to list media objects. Retrying later.")
return null to Result.retry(result.retryAfter()?.inWholeMilliseconds ?: defaultBackoff())
} else {
Log.w(TAG, "Failed to list remote media objects with code: ${result.code}. Unable to proceed.", result.getCause())
return null to Result.failure()
}
}
is NetworkResult.ApplicationError -> {
Log.w(TAG, "Failed to list remote media objects due to a crash.", result.getCause())
return null to Result.fatalFailure(RuntimeException(result.getCause()))
}
}
}
private fun deleteMediaObjectsFromCdn(attachmentsToDelete: Set<ArchivedMediaObject>): Result? {
when (val result = BackupRepository.deleteAbandonedMediaObjects(attachmentsToDelete)) {
is NetworkResult.Success -> {
Log.i(TAG, "Successfully deleted ${attachmentsToDelete.size} attachments off of the CDN.")
}
is NetworkResult.NetworkError -> {
return Result.retry(defaultBackoff())
}
is NetworkResult.StatusCodeError -> {
if (result.code == 429) {
Log.w(TAG, "Rate limited while attempting to delete media objects. Retrying later.")
return Result.retry(result.retryAfter()?.inWholeMilliseconds ?: defaultBackoff())
} else {
Log.w(TAG, "Failed to delete attachments from CDN with code: ${result.code}. Not failing job, just skipping and trying next page.", result.getCause())
}
}
else -> {
Log.w(TAG, "Crash when trying to delete attachments from the CDN", result.getCause())
return Result.fatalFailure(RuntimeException(result.getCause()))
}
}
return null
}
class Factory : Job.Factory<BackupMediaSnapshotSyncJob> {
override fun create(parameters: Parameters, serializedData: ByteArray?): BackupMediaSnapshotSyncJob {
val data = BackupMediaSnapshotSyncJobData.ADAPTER.decode(serializedData!!)
return BackupMediaSnapshotSyncJob(
syncTime = data.syncTime,
serverCursor = data.serverCursor.nullIfBlank(),
parameters = parameters
)
}
}
}

View File

@@ -194,7 +194,10 @@ class BackupMessagesJob private constructor(
SignalStore.backup.clearMessageBackupFailure()
SignalDatabase.backupMediaSnapshots.commitPendingRows()
BackupMediaSnapshotSyncJob.enqueue(currentTime)
AppDependencies.jobManager.add(ArchiveCommitAttachmentDeletesJob())
AppDependencies.jobManager.add(ArchiveAttachmentReconciliationJob())
return Result.success()
}

View File

@@ -119,6 +119,8 @@ public final class JobManagerFactories {
put(AnalyzeDatabaseJob.KEY, new AnalyzeDatabaseJob.Factory());
put(ApkUpdateJob.KEY, new ApkUpdateJob.Factory());
put(ArchiveAttachmentBackfillJob.KEY, new ArchiveAttachmentBackfillJob.Factory());
put(ArchiveAttachmentReconciliationJob.KEY, new ArchiveAttachmentReconciliationJob.Factory());
put(ArchiveCommitAttachmentDeletesJob.KEY, new ArchiveCommitAttachmentDeletesJob.Factory());
put(ArchiveThumbnailUploadJob.KEY, new ArchiveThumbnailUploadJob.Factory());
put(AttachmentCompressionJob.KEY, new AttachmentCompressionJob.Factory());
put(AttachmentCopyJob.KEY, new AttachmentCopyJob.Factory());
@@ -285,7 +287,6 @@ public final class JobManagerFactories {
put(BackfillDigestsMigrationJob.KEY, new BackfillDigestsMigrationJob.Factory());
put(BackfillDigestsForDuplicatesMigrationJob.KEY, new BackfillDigestsForDuplicatesMigrationJob.Factory());
put(BackupJitterMigrationJob.KEY, new BackupJitterMigrationJob.Factory());
put(BackupMediaSnapshotSyncJob.KEY, new BackupMediaSnapshotSyncJob.Factory());
put(BackupNotificationMigrationJob.KEY, new BackupNotificationMigrationJob.Factory());
put(BackupRefreshJob.KEY, new BackupRefreshJob.Factory());
put(BadE164MigrationJob.KEY, new BadE164MigrationJob.Factory());
@@ -390,6 +391,7 @@ public final class JobManagerFactories {
put("SendGiftJob", new FailingJob.Factory());
put("InactiveGroupCheckMigrationJob", new PassingMigrationJob.Factory());
put("AttachmentMarkUploadedJob", new FailingJob.Factory());
put("BackupMediaSnapshotSyncJob", new FailingJob.Factory());
}};
}

View File

@@ -45,7 +45,7 @@ class BackupValues(store: KeyValueStore) : SignalStoreValues(store) {
private const val KEY_NEXT_BACKUP_TIME = "backup.nextBackupTime"
private const val KEY_LAST_BACKUP_TIME = "backup.lastBackupTime"
private const val KEY_LAST_BACKUP_MEDIA_SYNC_TIME = "backup.lastBackupMediaSyncTime"
private const val KEY_LAST_ATTACHMENT_RECONCILIATION_TIME = "backup.lastBackupMediaSyncTime"
private const val KEY_TOTAL_RESTORABLE_ATTACHMENT_SIZE = "backup.totalRestorableAttachmentSize"
private const val KEY_BACKUP_FREQUENCY = "backup.backupFrequency"
@@ -112,7 +112,7 @@ class BackupValues(store: KeyValueStore) : SignalStoreValues(store) {
val daysSinceLastBackup: Int get() = (System.currentTimeMillis().milliseconds - lastBackupTime.milliseconds).inWholeDays.toInt()
var lastMediaSyncTime: Long by longValue(KEY_LAST_BACKUP_MEDIA_SYNC_TIME, -1)
var lastAttachmentReconciliationTime: Long by longValue(KEY_LAST_ATTACHMENT_RECONCILIATION_TIME, -1)
var backupFrequency: BackupFrequency by enumValue(KEY_BACKUP_FREQUENCY, BackupFrequency.DAILY, BackupFrequency.Serializer)
var userManuallySkippedMediaRestore: Boolean by booleanValue(KEY_USER_MANUALLY_SKIPPED_MEDIA_RESTORE, false)

View File

@@ -26,7 +26,7 @@ class LogSectionRemoteBackups : LogSection {
output.append("Latest tier: ${SignalStore.backup.latestBackupTier}\n")
output.append("Last backup time: ${SignalStore.backup.lastBackupTime}\n")
output.append("Last check-in: ${SignalStore.backup.lastCheckInMillis}\n")
output.append("Last media sync: ${SignalStore.backup.lastMediaSyncTime}\n")
output.append("Last media sync: ${SignalStore.backup.lastAttachmentReconciliationTime}\n")
output.append("Days since last backup: ${SignalStore.backup.daysSinceLastBackup}\n")
output.append("User manually skipped media restore: ${SignalStore.backup.userManuallySkippedMediaRestore}\n")
output.append("Can backup with cellular: ${SignalStore.backup.backupWithCellular}\n")

View File

@@ -30,10 +30,13 @@ import kotlin.concurrent.withLock
import kotlin.math.max
import kotlin.math.min
import kotlin.reflect.KProperty
import kotlin.time.Duration
import kotlin.time.Duration.Companion.days
import kotlin.time.Duration.Companion.hours
import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds
import kotlin.time.DurationUnit
import kotlin.time.toDuration
/**
* A location for accessing remotely-configured values.
@@ -428,6 +431,24 @@ object RemoteConfig {
)
}
private fun remoteDuration(
key: String,
defaultValue: Duration,
hotSwappable: Boolean,
durationUnit: DurationUnit,
active: Boolean = true,
onChangeListener: OnFlagChange? = null
): Config<Duration> {
return remoteValue(
key = key,
hotSwappable = hotSwappable,
sticky = false,
active = active,
onChangeListener = onChangeListener,
transformer = { it?.toString()?.toLongOrNull()?.toDuration(durationUnit) ?: defaultValue }
)
}
private fun <T : String?> remoteString(
key: String,
defaultValue: T,
@@ -1125,5 +1146,13 @@ object RemoteConfig {
inSeconds.seconds.inWholeMilliseconds
}
@JvmStatic
val archiveReconciliationSyncInterval: Duration by remoteDuration(
key = "android.archiveReconciliationSyncInterval",
defaultValue = 7.days,
hotSwappable = true,
durationUnit = DurationUnit.DAYS
)
// endregion
}

View File

@@ -141,8 +141,8 @@ message UploadAttachmentToArchiveJobData {
optional bool canReuseUpload = 3;
}
message BackupMediaSnapshotSyncJobData {
uint64 syncTime = 1;
message ArchiveAttachmentReconciliationJobData {
optional uint64 snapshot = 1;
string serverCursor = 2;
}

View File

@@ -276,6 +276,14 @@ class ArchiveApi(
/**
* Retrieves a page of media items in the user's archive.
*
* GET /v1/archives/media?limit={limit}&cursor={cursor}
*
* - 200: Success
* - 400: Bad request, or made on authenticated channel
* - 403: Forbidden
* - 429: Rate-limited
*
* @param limit The maximum number of items to return.
* @param cursor A token that can be read from your previous response, telling the server where to start the next page.
*/
@@ -291,13 +299,15 @@ class ArchiveApi(
/**
* Copy and re-encrypt media from the attachments cdn into the backup cdn.
*
* Possible errors:
* 400: Bad arguments, or made on an authenticated channel
* 401: Invalid presentation or signature
* 403: Insufficient permissions
* 410: The source object was not found
* 413: No media space remaining
* 429: Rate-limited
* PUT /v1/archives/media
*
* - 200: Success
* - 400: Bad arguments, or made on an authenticated channel
* - 401: Invalid presentation or signature
* - 403: Insufficient permissions
* - 410: The source object was not found
* - 413: No media space remaining
* - 429: Rate-limited
*/
fun copyAttachmentToArchive(
aci: ACI,