Add archived media sync job.

This commit is contained in:
Clark
2024-04-16 15:01:16 -04:00
committed by Greyson Parrelli
parent 7a2d408ca2
commit d8bbfe2678
10 changed files with 215 additions and 13 deletions

View File

@@ -224,6 +224,22 @@ object BackupRepository {
Log.d(TAG, "import() ${eventTimer.stop().summary}")
}
fun listRemoteMediaObjects(limit: Int, cursor: String? = null): NetworkResult<ArchiveGetMediaItemsResponse> {
val api = ApplicationDependencies.getSignalServiceAccountManager().archiveApi
val backupKey = SignalStore.svr().getOrCreateMasterKey().deriveBackupKey()
return api
.triggerBackupIdReservation(backupKey)
.then { getAuthCredential() }
.then { credential ->
api.setPublicKey(backupKey, credential)
.map { credential }
}
.then { credential ->
api.getArchiveMediaItemsPage(backupKey, credential, limit, cursor)
}
}
/**
* Returns an object with details about the remote backup state.
*/
@@ -420,6 +436,34 @@ object BackupRepository {
.also { Log.i(TAG, "deleteArchivedMediaResult: $it") }
}
fun deleteAbandonedMediaObjects(mediaObjects: Collection<ArchivedMediaObject>): NetworkResult<Unit> {
val api = ApplicationDependencies.getSignalServiceAccountManager().archiveApi
val backupKey = SignalStore.svr().getOrCreateMasterKey().deriveBackupKey()
val mediaToDelete = mediaObjects
.map {
DeleteArchivedMediaRequest.ArchivedMediaObject(
cdn = it.cdn,
mediaId = it.mediaId
)
}
if (mediaToDelete.isEmpty()) {
Log.i(TAG, "No media to delete, quick success")
return NetworkResult.Success(Unit)
}
return getAuthCredential()
.then { credential ->
api.deleteArchivedMedia(
backupKey = backupKey,
serviceCredential = credential,
mediaToDelete = mediaToDelete
)
}
.also { Log.i(TAG, "deleteAbandonedMediaObjectsResult: $it") }
}
fun debugDeleteAllArchivedMedia(): NetworkResult<Unit> {
val api = ApplicationDependencies.getSignalServiceAccountManager().archiveApi
val backupKey = SignalStore.svr().getOrCreateMasterKey().deriveBackupKey()
@@ -566,6 +610,8 @@ object BackupRepository {
}
}
data class ArchivedMediaObject(val mediaId: String, val cdn: Int)
data class BackupDirectories(val backupDir: String, val mediaDir: String)
class ExportState(val backupTime: Long, val allowMediaBackup: Boolean) {

View File

@@ -15,14 +15,15 @@ import io.reactivex.rxjava3.disposables.CompositeDisposable
import io.reactivex.rxjava3.kotlin.plusAssign
import io.reactivex.rxjava3.kotlin.subscribeBy
import io.reactivex.rxjava3.schedulers.Schedulers
import org.signal.core.util.orNull
import org.signal.libsignal.zkgroup.profiles.ProfileKey
import org.thoughtcrime.securesms.backup.v2.BackupRepository
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies
import org.thoughtcrime.securesms.jobmanager.JobTracker
import org.thoughtcrime.securesms.jobs.BackupRestoreJob
import org.thoughtcrime.securesms.jobs.BackupRestoreMediaJob
import org.thoughtcrime.securesms.jobs.SyncArchivedMediaJob
import org.thoughtcrime.securesms.recipients.Recipient
import java.io.InputStream
import kotlin.time.Duration.Companion.seconds
class MessageBackupsTestRestoreViewModel : ViewModel() {
val disposables = CompositeDisposable()
@@ -47,8 +48,12 @@ class MessageBackupsTestRestoreViewModel : ViewModel() {
fun restore() {
_state.value = _state.value.copy(importState = ImportState.IN_PROGRESS)
disposables += Single.fromCallable {
val jobState = ApplicationDependencies.getJobManager().runSynchronously(BackupRestoreJob(), 120_000)
jobState.orNull() == JobTracker.JobState.SUCCESS
ApplicationDependencies
.getJobManager()
.startChain(BackupRestoreJob())
.then(SyncArchivedMediaJob())
.then(BackupRestoreMediaJob())
.enqueueAndBlockUntilCompletion(120.seconds.inWholeMilliseconds)
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())

View File

@@ -30,6 +30,7 @@ import org.thoughtcrime.securesms.jobs.AttachmentUploadJob
import org.thoughtcrime.securesms.jobs.BackupMessagesJob
import org.thoughtcrime.securesms.jobs.BackupRestoreJob
import org.thoughtcrime.securesms.jobs.BackupRestoreMediaJob
import org.thoughtcrime.securesms.jobs.SyncArchivedMediaJob
import org.thoughtcrime.securesms.keyvalue.SignalStore
import org.thoughtcrime.securesms.mms.IncomingMessage
import org.thoughtcrime.securesms.recipients.Recipient
@@ -168,6 +169,7 @@ class InternalBackupPlaygroundViewModel : ViewModel() {
ApplicationDependencies
.getJobManager()
.startChain(BackupRestoreJob())
.then(SyncArchivedMediaJob())
.then(BackupRestoreMediaJob())
.enqueueAndBlockUntilCompletion(120.seconds.inWholeMilliseconds)
}

View File

@@ -250,7 +250,8 @@ class AttachmentTable(
"CREATE INDEX IF NOT EXISTS attachment_sticker_pack_id_index ON $TABLE_NAME ($STICKER_PACK_ID);",
"CREATE INDEX IF NOT EXISTS attachment_data_hash_start_index ON $TABLE_NAME ($DATA_HASH_START);",
"CREATE INDEX IF NOT EXISTS attachment_data_hash_end_index ON $TABLE_NAME ($DATA_HASH_END);",
"CREATE INDEX IF NOT EXISTS attachment_data_index ON $TABLE_NAME ($DATA_FILE);"
"CREATE INDEX IF NOT EXISTS attachment_data_index ON $TABLE_NAME ($DATA_FILE);",
"CREATE INDEX IF NOT EXISTS attachment_archive_media_id_index ON $TABLE_NAME ($ARCHIVE_MEDIA_ID);"
)
val ATTACHMENT_POINTER_REUSE_THRESHOLD = 7.days.inWholeMilliseconds
@@ -1300,6 +1301,16 @@ class AttachmentTable(
.run()
}
fun updateArchiveCdnByMediaId(archiveMediaId: String, archiveCdn: Int): Int {
return writableDatabase
.update(TABLE_NAME)
.values(
ARCHIVE_CDN to archiveCdn
)
.where("$ARCHIVE_MEDIA_ID = ?", archiveMediaId)
.run()
}
fun clearArchiveData(attachmentIds: List<AttachmentId>) {
SqlUtil.buildCollectionQuery(ID, attachmentIds.map { it.id })
.forEach { query ->

View File

@@ -83,6 +83,7 @@ import org.thoughtcrime.securesms.database.helpers.migration.V222_DataHashRefact
import org.thoughtcrime.securesms.database.helpers.migration.V223_AddNicknameAndNoteFieldsToRecipientTable
import org.thoughtcrime.securesms.database.helpers.migration.V224_AddAttachmentArchiveColumns
import org.thoughtcrime.securesms.database.helpers.migration.V225_AddLocalUserJoinedStateAndGroupCallActiveState
import org.thoughtcrime.securesms.database.helpers.migration.V226_AddAttachmentMediaIdIndex
/**
* Contains all of the database migrations for [SignalDatabase]. Broken into a separate file for cleanliness.
@@ -168,10 +169,11 @@ object SignalDatabaseMigrations {
222 to V222_DataHashRefactor,
223 to V223_AddNicknameAndNoteFieldsToRecipientTable,
224 to V224_AddAttachmentArchiveColumns,
225 to V225_AddLocalUserJoinedStateAndGroupCallActiveState
225 to V225_AddLocalUserJoinedStateAndGroupCallActiveState,
226 to V226_AddAttachmentMediaIdIndex
)
const val DATABASE_VERSION = 225
const val DATABASE_VERSION = 226
@JvmStatic
fun migrate(context: Application, db: SQLiteDatabase, oldVersion: Int, newVersion: Int) {

View File

@@ -0,0 +1,19 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.database.helpers.migration
import android.app.Application
import net.zetetic.database.sqlcipher.SQLiteDatabase
/**
* Adds index to archive_media_id
*/
@Suppress("ClassName")
object V226_AddAttachmentMediaIdIndex : SignalDatabaseMigration {
override fun migrate(context: Application, db: SQLiteDatabase, oldVersion: Int, newVersion: Int) {
db.execSQL("CREATE INDEX IF NOT EXISTS attachment_archive_media_id_index ON attachment (archive_media_id);")
}
}

View File

@@ -223,6 +223,7 @@ public final class JobManagerFactories {
put(StoryOnboardingDownloadJob.KEY, new StoryOnboardingDownloadJob.Factory());
put(SubmitRateLimitPushChallengeJob.KEY, new SubmitRateLimitPushChallengeJob.Factory());
put(Svr2MirrorJob.KEY, new Svr2MirrorJob.Factory());
put(SyncArchivedMediaJob.KEY, new SyncArchivedMediaJob.Factory());
put(ThreadUpdateJob.KEY, new ThreadUpdateJob.Factory());
put(TrimThreadJob.KEY, new TrimThreadJob.Factory());
put(TypingSendJob.KEY, new TypingSendJob.Factory());

View File

@@ -0,0 +1,107 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.jobs
import org.signal.core.util.logging.Log
import org.signal.core.util.withinTransaction
import org.thoughtcrime.securesms.backup.v2.ArchivedMediaObject
import org.thoughtcrime.securesms.backup.v2.BackupRepository
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobmanager.JsonJobData
import org.whispersystems.signalservice.api.archive.ArchiveGetMediaItemsResponse
import org.whispersystems.signalservice.api.push.exceptions.NetworkFailureException
import java.lang.Exception
/**
* Job responsible for keeping remote archive media objects in sync. That is
* we make sure our CDN number aligns on all media ids, as well as deleting any
* extra media ids that we don't know about.
*/
class SyncArchivedMediaJob private constructor(
parameters: Parameters,
private var jobCursor: String?
) : BaseJob(parameters) {
companion object {
private val TAG = Log.tag(BackupRestoreMediaJob::class.java)
private const val KEY_CURSOR = "cursor"
const val KEY = "SyncArchivedMediaJob"
}
constructor(cursor: String? = null) : this(
Parameters.Builder()
.setQueue("SyncArchivedMedia")
.setMaxAttempts(Parameters.UNLIMITED)
.setMaxInstancesForQueue(2)
.build(),
cursor
)
override fun serialize(): ByteArray? {
return JsonJobData.Builder()
.putString(KEY_CURSOR, jobCursor)
.serialize()
}
override fun getFactoryKey(): String = KEY
override fun onFailure() = Unit
override fun onRun() {
val batchSize = 100
val attachmentsToDelete = HashSet<ArchivedMediaObject>()
var cursor: String? = jobCursor
do {
val archivedItemPage = BackupRepository.listRemoteMediaObjects(batchSize, cursor).successOrThrow()
attachmentsToDelete += syncPage(archivedItemPage)
cursor = archivedItemPage.cursor
if (attachmentsToDelete.size >= batchSize) {
BackupRepository.deleteAbandonedMediaObjects(attachmentsToDelete)
Log.i(TAG, "Deleted ${attachmentsToDelete.size} attachments off CDN")
attachmentsToDelete.clear()
}
if (attachmentsToDelete.isEmpty()) {
jobCursor = archivedItemPage.cursor
}
} while (cursor != null)
if (attachmentsToDelete.isNotEmpty()) {
BackupRepository.deleteAbandonedMediaObjects(attachmentsToDelete)
Log.i(TAG, "Deleted ${attachmentsToDelete.size} attachments off CDN")
}
}
/**
* Update CDNs of archived media items. Returns set of objects that don't match
* to a local attachment DB row.
*/
private fun syncPage(archivedItemPage: ArchiveGetMediaItemsResponse): Set<ArchivedMediaObject> {
val abandonedObjects = HashSet<ArchivedMediaObject>()
SignalDatabase.rawDatabase.withinTransaction {
archivedItemPage.storedMediaObjects.forEach { storedMediaObject ->
val rows = SignalDatabase.attachments.updateArchiveCdnByMediaId(archiveMediaId = storedMediaObject.mediaId, archiveCdn = storedMediaObject.cdn)
if (rows == 0) {
abandonedObjects.add(ArchivedMediaObject(storedMediaObject.mediaId, storedMediaObject.cdn))
}
}
}
return abandonedObjects
}
override fun onShouldRetry(e: Exception): Boolean {
return e is NetworkFailureException
}
class Factory : Job.Factory<SyncArchivedMediaJob> {
override fun create(parameters: Parameters, serializedData: ByteArray?): SyncArchivedMediaJob {
val data = JsonJobData.deserialize(serializedData)
return SyncArchivedMediaJob(parameters, if (data.hasString(KEY_CURSOR)) data.getString(KEY_CURSOR) else null)
}
}
}