diff --git a/app/src/main/java/org/thoughtcrime/securesms/backup/v2/BackupRepository.kt b/app/src/main/java/org/thoughtcrime/securesms/backup/v2/BackupRepository.kt index 367706530a..52cc26740f 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/backup/v2/BackupRepository.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/backup/v2/BackupRepository.kt @@ -224,6 +224,22 @@ object BackupRepository { Log.d(TAG, "import() ${eventTimer.stop().summary}") } + fun listRemoteMediaObjects(limit: Int, cursor: String? = null): NetworkResult { + val api = ApplicationDependencies.getSignalServiceAccountManager().archiveApi + val backupKey = SignalStore.svr().getOrCreateMasterKey().deriveBackupKey() + + return api + .triggerBackupIdReservation(backupKey) + .then { getAuthCredential() } + .then { credential -> + api.setPublicKey(backupKey, credential) + .map { credential } + } + .then { credential -> + api.getArchiveMediaItemsPage(backupKey, credential, limit, cursor) + } + } + /** * Returns an object with details about the remote backup state. */ @@ -420,6 +436,34 @@ object BackupRepository { .also { Log.i(TAG, "deleteArchivedMediaResult: $it") } } + fun deleteAbandonedMediaObjects(mediaObjects: Collection): NetworkResult { + val api = ApplicationDependencies.getSignalServiceAccountManager().archiveApi + val backupKey = SignalStore.svr().getOrCreateMasterKey().deriveBackupKey() + + val mediaToDelete = mediaObjects + .map { + DeleteArchivedMediaRequest.ArchivedMediaObject( + cdn = it.cdn, + mediaId = it.mediaId + ) + } + + if (mediaToDelete.isEmpty()) { + Log.i(TAG, "No media to delete, quick success") + return NetworkResult.Success(Unit) + } + + return getAuthCredential() + .then { credential -> + api.deleteArchivedMedia( + backupKey = backupKey, + serviceCredential = credential, + mediaToDelete = mediaToDelete + ) + } + .also { Log.i(TAG, "deleteAbandonedMediaObjectsResult: $it") } + } + fun debugDeleteAllArchivedMedia(): NetworkResult { val api = ApplicationDependencies.getSignalServiceAccountManager().archiveApi val backupKey = SignalStore.svr().getOrCreateMasterKey().deriveBackupKey() @@ -566,6 +610,8 @@ object BackupRepository { } } +data class ArchivedMediaObject(val mediaId: String, val cdn: Int) + data class BackupDirectories(val backupDir: String, val mediaDir: String) class ExportState(val backupTime: Long, val allowMediaBackup: Boolean) { diff --git a/app/src/main/java/org/thoughtcrime/securesms/backup/v2/ui/MessageBackupsTestRestoreViewModel.kt b/app/src/main/java/org/thoughtcrime/securesms/backup/v2/ui/MessageBackupsTestRestoreViewModel.kt index fc3774f37c..071d0c5894 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/backup/v2/ui/MessageBackupsTestRestoreViewModel.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/backup/v2/ui/MessageBackupsTestRestoreViewModel.kt @@ -15,14 +15,15 @@ import io.reactivex.rxjava3.disposables.CompositeDisposable import io.reactivex.rxjava3.kotlin.plusAssign import io.reactivex.rxjava3.kotlin.subscribeBy import io.reactivex.rxjava3.schedulers.Schedulers -import org.signal.core.util.orNull import org.signal.libsignal.zkgroup.profiles.ProfileKey import org.thoughtcrime.securesms.backup.v2.BackupRepository import org.thoughtcrime.securesms.dependencies.ApplicationDependencies -import org.thoughtcrime.securesms.jobmanager.JobTracker import org.thoughtcrime.securesms.jobs.BackupRestoreJob +import org.thoughtcrime.securesms.jobs.BackupRestoreMediaJob +import org.thoughtcrime.securesms.jobs.SyncArchivedMediaJob import org.thoughtcrime.securesms.recipients.Recipient import java.io.InputStream +import kotlin.time.Duration.Companion.seconds class MessageBackupsTestRestoreViewModel : ViewModel() { val disposables = CompositeDisposable() @@ -47,8 +48,12 @@ class MessageBackupsTestRestoreViewModel : ViewModel() { fun restore() { _state.value = _state.value.copy(importState = ImportState.IN_PROGRESS) disposables += Single.fromCallable { - val jobState = ApplicationDependencies.getJobManager().runSynchronously(BackupRestoreJob(), 120_000) - jobState.orNull() == JobTracker.JobState.SUCCESS + ApplicationDependencies + .getJobManager() + .startChain(BackupRestoreJob()) + .then(SyncArchivedMediaJob()) + .then(BackupRestoreMediaJob()) + .enqueueAndBlockUntilCompletion(120.seconds.inWholeMilliseconds) } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) diff --git a/app/src/main/java/org/thoughtcrime/securesms/components/settings/app/internal/backup/InternalBackupPlaygroundViewModel.kt b/app/src/main/java/org/thoughtcrime/securesms/components/settings/app/internal/backup/InternalBackupPlaygroundViewModel.kt index 91414a28f1..63787909a0 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/components/settings/app/internal/backup/InternalBackupPlaygroundViewModel.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/components/settings/app/internal/backup/InternalBackupPlaygroundViewModel.kt @@ -30,6 +30,7 @@ import org.thoughtcrime.securesms.jobs.AttachmentUploadJob import org.thoughtcrime.securesms.jobs.BackupMessagesJob import org.thoughtcrime.securesms.jobs.BackupRestoreJob import org.thoughtcrime.securesms.jobs.BackupRestoreMediaJob +import org.thoughtcrime.securesms.jobs.SyncArchivedMediaJob import org.thoughtcrime.securesms.keyvalue.SignalStore import org.thoughtcrime.securesms.mms.IncomingMessage import org.thoughtcrime.securesms.recipients.Recipient @@ -168,6 +169,7 @@ class InternalBackupPlaygroundViewModel : ViewModel() { ApplicationDependencies .getJobManager() .startChain(BackupRestoreJob()) + .then(SyncArchivedMediaJob()) .then(BackupRestoreMediaJob()) .enqueueAndBlockUntilCompletion(120.seconds.inWholeMilliseconds) } diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt b/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt index 149bab4270..e10954a671 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt @@ -250,7 +250,8 @@ class AttachmentTable( "CREATE INDEX IF NOT EXISTS attachment_sticker_pack_id_index ON $TABLE_NAME ($STICKER_PACK_ID);", "CREATE INDEX IF NOT EXISTS attachment_data_hash_start_index ON $TABLE_NAME ($DATA_HASH_START);", "CREATE INDEX IF NOT EXISTS attachment_data_hash_end_index ON $TABLE_NAME ($DATA_HASH_END);", - "CREATE INDEX IF NOT EXISTS attachment_data_index ON $TABLE_NAME ($DATA_FILE);" + "CREATE INDEX IF NOT EXISTS attachment_data_index ON $TABLE_NAME ($DATA_FILE);", + "CREATE INDEX IF NOT EXISTS attachment_archive_media_id_index ON $TABLE_NAME ($ARCHIVE_MEDIA_ID);" ) val ATTACHMENT_POINTER_REUSE_THRESHOLD = 7.days.inWholeMilliseconds @@ -1300,6 +1301,16 @@ class AttachmentTable( .run() } + fun updateArchiveCdnByMediaId(archiveMediaId: String, archiveCdn: Int): Int { + return writableDatabase + .update(TABLE_NAME) + .values( + ARCHIVE_CDN to archiveCdn + ) + .where("$ARCHIVE_MEDIA_ID = ?", archiveMediaId) + .run() + } + fun clearArchiveData(attachmentIds: List) { SqlUtil.buildCollectionQuery(ID, attachmentIds.map { it.id }) .forEach { query -> diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/helpers/SignalDatabaseMigrations.kt b/app/src/main/java/org/thoughtcrime/securesms/database/helpers/SignalDatabaseMigrations.kt index cc1d3741aa..9cffce8811 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/helpers/SignalDatabaseMigrations.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/helpers/SignalDatabaseMigrations.kt @@ -83,6 +83,7 @@ import org.thoughtcrime.securesms.database.helpers.migration.V222_DataHashRefact import org.thoughtcrime.securesms.database.helpers.migration.V223_AddNicknameAndNoteFieldsToRecipientTable import org.thoughtcrime.securesms.database.helpers.migration.V224_AddAttachmentArchiveColumns import org.thoughtcrime.securesms.database.helpers.migration.V225_AddLocalUserJoinedStateAndGroupCallActiveState +import org.thoughtcrime.securesms.database.helpers.migration.V226_AddAttachmentMediaIdIndex /** * Contains all of the database migrations for [SignalDatabase]. Broken into a separate file for cleanliness. @@ -168,10 +169,11 @@ object SignalDatabaseMigrations { 222 to V222_DataHashRefactor, 223 to V223_AddNicknameAndNoteFieldsToRecipientTable, 224 to V224_AddAttachmentArchiveColumns, - 225 to V225_AddLocalUserJoinedStateAndGroupCallActiveState + 225 to V225_AddLocalUserJoinedStateAndGroupCallActiveState, + 226 to V226_AddAttachmentMediaIdIndex ) - const val DATABASE_VERSION = 225 + const val DATABASE_VERSION = 226 @JvmStatic fun migrate(context: Application, db: SQLiteDatabase, oldVersion: Int, newVersion: Int) { diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/helpers/migration/V226_AddAttachmentMediaIdIndex.kt b/app/src/main/java/org/thoughtcrime/securesms/database/helpers/migration/V226_AddAttachmentMediaIdIndex.kt new file mode 100644 index 0000000000..fd89b97879 --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/database/helpers/migration/V226_AddAttachmentMediaIdIndex.kt @@ -0,0 +1,19 @@ +/* + * Copyright 2024 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.thoughtcrime.securesms.database.helpers.migration + +import android.app.Application +import net.zetetic.database.sqlcipher.SQLiteDatabase + +/** + * Adds index to archive_media_id + */ +@Suppress("ClassName") +object V226_AddAttachmentMediaIdIndex : SignalDatabaseMigration { + override fun migrate(context: Application, db: SQLiteDatabase, oldVersion: Int, newVersion: Int) { + db.execSQL("CREATE INDEX IF NOT EXISTS attachment_archive_media_id_index ON attachment (archive_media_id);") + } +} diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java index 2e6c57546f..01553fac8f 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java @@ -223,6 +223,7 @@ public final class JobManagerFactories { put(StoryOnboardingDownloadJob.KEY, new StoryOnboardingDownloadJob.Factory()); put(SubmitRateLimitPushChallengeJob.KEY, new SubmitRateLimitPushChallengeJob.Factory()); put(Svr2MirrorJob.KEY, new Svr2MirrorJob.Factory()); + put(SyncArchivedMediaJob.KEY, new SyncArchivedMediaJob.Factory()); put(ThreadUpdateJob.KEY, new ThreadUpdateJob.Factory()); put(TrimThreadJob.KEY, new TrimThreadJob.Factory()); put(TypingSendJob.KEY, new TypingSendJob.Factory()); diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/SyncArchivedMediaJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/SyncArchivedMediaJob.kt new file mode 100644 index 0000000000..45fd928d09 --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/SyncArchivedMediaJob.kt @@ -0,0 +1,107 @@ +/* + * Copyright 2024 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.thoughtcrime.securesms.jobs + +import org.signal.core.util.logging.Log +import org.signal.core.util.withinTransaction +import org.thoughtcrime.securesms.backup.v2.ArchivedMediaObject +import org.thoughtcrime.securesms.backup.v2.BackupRepository +import org.thoughtcrime.securesms.database.SignalDatabase +import org.thoughtcrime.securesms.jobmanager.Job +import org.thoughtcrime.securesms.jobmanager.JsonJobData +import org.whispersystems.signalservice.api.archive.ArchiveGetMediaItemsResponse +import org.whispersystems.signalservice.api.push.exceptions.NetworkFailureException +import java.lang.Exception + +/** + * Job responsible for keeping remote archive media objects in sync. That is + * we make sure our CDN number aligns on all media ids, as well as deleting any + * extra media ids that we don't know about. + */ +class SyncArchivedMediaJob private constructor( + parameters: Parameters, + private var jobCursor: String? +) : BaseJob(parameters) { + + companion object { + private val TAG = Log.tag(BackupRestoreMediaJob::class.java) + + private const val KEY_CURSOR = "cursor" + + const val KEY = "SyncArchivedMediaJob" + } + + constructor(cursor: String? = null) : this( + Parameters.Builder() + .setQueue("SyncArchivedMedia") + .setMaxAttempts(Parameters.UNLIMITED) + .setMaxInstancesForQueue(2) + .build(), + cursor + ) + + override fun serialize(): ByteArray? { + return JsonJobData.Builder() + .putString(KEY_CURSOR, jobCursor) + .serialize() + } + + override fun getFactoryKey(): String = KEY + + override fun onFailure() = Unit + + override fun onRun() { + val batchSize = 100 + val attachmentsToDelete = HashSet() + var cursor: String? = jobCursor + do { + val archivedItemPage = BackupRepository.listRemoteMediaObjects(batchSize, cursor).successOrThrow() + attachmentsToDelete += syncPage(archivedItemPage) + cursor = archivedItemPage.cursor + if (attachmentsToDelete.size >= batchSize) { + BackupRepository.deleteAbandonedMediaObjects(attachmentsToDelete) + Log.i(TAG, "Deleted ${attachmentsToDelete.size} attachments off CDN") + attachmentsToDelete.clear() + } + if (attachmentsToDelete.isEmpty()) { + jobCursor = archivedItemPage.cursor + } + } while (cursor != null) + + if (attachmentsToDelete.isNotEmpty()) { + BackupRepository.deleteAbandonedMediaObjects(attachmentsToDelete) + Log.i(TAG, "Deleted ${attachmentsToDelete.size} attachments off CDN") + } + } + + /** + * Update CDNs of archived media items. Returns set of objects that don't match + * to a local attachment DB row. + */ + private fun syncPage(archivedItemPage: ArchiveGetMediaItemsResponse): Set { + val abandonedObjects = HashSet() + SignalDatabase.rawDatabase.withinTransaction { + archivedItemPage.storedMediaObjects.forEach { storedMediaObject -> + val rows = SignalDatabase.attachments.updateArchiveCdnByMediaId(archiveMediaId = storedMediaObject.mediaId, archiveCdn = storedMediaObject.cdn) + if (rows == 0) { + abandonedObjects.add(ArchivedMediaObject(storedMediaObject.mediaId, storedMediaObject.cdn)) + } + } + } + return abandonedObjects + } + + override fun onShouldRetry(e: Exception): Boolean { + return e is NetworkFailureException + } + + class Factory : Job.Factory { + override fun create(parameters: Parameters, serializedData: ByteArray?): SyncArchivedMediaJob { + val data = JsonJobData.deserialize(serializedData) + return SyncArchivedMediaJob(parameters, if (data.hasString(KEY_CURSOR)) data.getString(KEY_CURSOR) else null) + } + } +} diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/archive/ArchiveApi.kt b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/archive/ArchiveApi.kt index 1928bd5d6b..04ac52f092 100644 --- a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/archive/ArchiveApi.kt +++ b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/archive/ArchiveApi.kt @@ -112,6 +112,17 @@ class ArchiveApi( } } + /** + * Lists the media objects in the backup + */ + fun listMediaObjects(backupKey: BackupKey, serviceCredential: ArchiveServiceCredential, limit: Int, cursor: String? = null): NetworkResult { + return NetworkResult.fromFetch { + val zkCredential = getZkCredential(backupKey, serviceCredential) + val presentationData = CredentialPresentationData.from(backupKey, zkCredential, backupServerPublicParams) + pushServiceSocket.getArchiveMediaItemsPage(presentationData.toArchiveCredentialPresentation(), limit, cursor) + } + } + /** * Retrieves a resumable upload URL you can use to upload your main message backup file to cloud storage. */ @@ -136,15 +147,11 @@ class ArchiveApi( */ fun debugGetUploadedMediaItemMetadata(backupKey: BackupKey, serviceCredential: ArchiveServiceCredential): NetworkResult> { return NetworkResult.fromFetch { - val zkCredential = getZkCredential(backupKey, serviceCredential) - val presentationData = CredentialPresentationData.from(backupKey, zkCredential, backupServerPublicParams) - val credentialPresentation = presentationData.toArchiveCredentialPresentation() - val mediaObjects: MutableList = ArrayList() var cursor: String? = null do { - val response: ArchiveGetMediaItemsResponse = pushServiceSocket.getArchiveMediaItemsPage(credentialPresentation, 512, cursor) + val response: ArchiveGetMediaItemsResponse = getArchiveMediaItemsPage(backupKey, serviceCredential, 512, cursor).successOrThrow() mediaObjects += response.storedMediaObjects cursor = response.cursor } while (cursor != null) @@ -158,7 +165,7 @@ class ArchiveApi( * @param limit The maximum number of items to return. * @param cursor A token that can be read from your previous response, telling the server where to start the next page. */ - fun getArchiveMediaItemsPage(backupKey: BackupKey, serviceCredential: ArchiveServiceCredential, limit: Int, cursor: String): NetworkResult { + fun getArchiveMediaItemsPage(backupKey: BackupKey, serviceCredential: ArchiveServiceCredential, limit: Int, cursor: String?): NetworkResult { return NetworkResult.fromFetch { val zkCredential = getZkCredential(backupKey, serviceCredential) val presentationData = CredentialPresentationData.from(backupKey, zkCredential, backupServerPublicParams) diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/archive/ArchiveGetMediaItemsResponse.kt b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/archive/ArchiveGetMediaItemsResponse.kt index a55b4d3bc3..a456fad647 100644 --- a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/archive/ArchiveGetMediaItemsResponse.kt +++ b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/archive/ArchiveGetMediaItemsResponse.kt @@ -12,6 +12,8 @@ import com.fasterxml.jackson.annotation.JsonProperty */ class ArchiveGetMediaItemsResponse( @JsonProperty val storedMediaObjects: List, + @JsonProperty val backupDir: String?, + @JsonProperty val mediaDir: String?, @JsonProperty val cursor: String? ) { class StoredMediaObject(