Fix potential performance issues during backup attachment upload.

This commit is contained in:
Greyson Parrelli
2025-11-19 10:37:43 -05:00
committed by Cody Henthorne
parent eebf3e0836
commit 7978cc668d
7 changed files with 199 additions and 91 deletions

View File

@@ -0,0 +1,56 @@
/*
* Copyright 2025 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.backup.v2
import org.signal.core.util.ThreadUtil
import org.signal.core.util.concurrent.SignalExecutors
import org.thoughtcrime.securesms.dependencies.AppDependencies
import org.thoughtcrime.securesms.util.ThrottledDebouncer
import java.util.concurrent.ExecutionException
import java.util.concurrent.Executors
import kotlin.time.Duration.Companion.seconds
/**
* During media upload/restore, we tend to hammer the database from a lot of different threads at once. This can block writes for more urgent things, like message
* sends. To reduce the impact, we put all of our database writes on a single-thread executor.
*/
object ArchiveDatabaseExecutor {
val executor = Executors.newSingleThreadExecutor(SignalExecutors.NumberedThreadFactory("archive-db", ThreadUtil.PRIORITY_IMPORTANT_BACKGROUND_THREAD))
/**
* By default, downloading/uploading an attachment wants to notify a bunch of database observation listeners. This slams the observer so hard that other
* people using it will experience massive delays in notifications. To avoid this, we turn off notifications for downloads, and then use this notifier to
* push some out every so often.
*/
val databaseObserverNotifier = ThrottledDebouncer(5.seconds.inWholeMilliseconds)
val notifyAttachmentObservers = {
AppDependencies.databaseObserver.notifyConversationListListeners()
AppDependencies.databaseObserver.notifyAttachmentUpdatedObservers()
}
val notifyAttachmentAndChatListObservers = {
AppDependencies.databaseObserver.notifyConversationListListeners()
AppDependencies.databaseObserver.notifyAttachmentUpdatedObservers()
}
fun <T> runBlocking(block: () -> T): T {
return try {
executor.submit(block).get()
} catch (e: ExecutionException) {
throw e.cause ?: e
}
}
fun throttledNotifyAttachmentObservers() {
databaseObserverNotifier.publish(notifyAttachmentObservers)
}
fun throttledNotifyAttachmentAndChatListObservers() {
databaseObserverNotifier.publish(notifyAttachmentAndChatListObservers)
}
}

View File

@@ -924,7 +924,7 @@ class AttachmentTable(
/**
* Sets the archive transfer state for the given attachment and all other attachments that share the same data file.
*/
fun setArchiveTransferState(id: AttachmentId, state: ArchiveTransferState) {
fun setArchiveTransferState(id: AttachmentId, state: ArchiveTransferState, notify: Boolean = true) {
writableDatabase.withinTransaction {
val dataFile: String = readableDatabase
.select(DATA_FILE)
@@ -940,14 +940,16 @@ class AttachmentTable(
.run()
}
AppDependencies.databaseObserver.notifyAttachmentUpdatedObservers()
if (notify) {
AppDependencies.databaseObserver.notifyAttachmentUpdatedObservers()
}
}
/**
* Sets the archive transfer state for the given attachment id, remote key, and plain text hash tuple and all other attachments that
* share the same data file.
*/
fun setArchiveTransferState(id: AttachmentId, remoteKey: String, plaintextHash: String, state: ArchiveTransferState): Boolean {
fun setArchiveTransferState(id: AttachmentId, remoteKey: String, plaintextHash: String, state: ArchiveTransferState, notify: Boolean = true): Boolean {
writableDatabase.withinTransaction {
val dataFile: String = readableDatabase
.select(DATA_FILE)
@@ -963,7 +965,9 @@ class AttachmentTable(
.run()
}
AppDependencies.databaseObserver.notifyAttachmentUpdatedObservers()
if (notify) {
AppDependencies.databaseObserver.notifyAttachmentUpdatedObservers()
}
return true
}

View File

@@ -10,6 +10,7 @@ import org.signal.protos.resumableuploads.ResumableUpload
import org.thoughtcrime.securesms.attachments.AttachmentId
import org.thoughtcrime.securesms.attachments.DatabaseAttachment
import org.thoughtcrime.securesms.attachments.PointerAttachment
import org.thoughtcrime.securesms.backup.v2.ArchiveDatabaseExecutor
import org.thoughtcrime.securesms.backup.v2.BackupRepository
import org.thoughtcrime.securesms.backup.v2.hadIntegrityCheckPerformed
import org.thoughtcrime.securesms.backup.v2.requireThumbnailMediaName
@@ -99,7 +100,9 @@ class ArchiveThumbnailUploadJob private constructor(
val transferStatus = SignalDatabase.attachments.getArchiveThumbnailTransferState(attachmentId) ?: return
if (transferStatus == AttachmentTable.ArchiveTransferState.NONE) {
SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.UPLOAD_IN_PROGRESS)
ArchiveDatabaseExecutor.runBlocking {
SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.UPLOAD_IN_PROGRESS)
}
}
}
@@ -117,31 +120,41 @@ class ArchiveThumbnailUploadJob private constructor(
if (!MediaUtil.isImageOrVideoType(attachment.contentType)) {
Log.w(TAG, "$attachmentId isn't visual media (contentType = ${attachment.contentType}). Skipping.")
SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
ArchiveDatabaseExecutor.runBlocking {
SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
}
return Result.success()
}
if (attachment.quote) {
Log.w(TAG, "$attachmentId is a quote. Skipping.")
SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
ArchiveDatabaseExecutor.runBlocking {
SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
}
return Result.success()
}
if (attachment.dataHash == null || attachment.remoteKey == null) {
Log.w(TAG, "$attachmentId is missing necessary ingredients for a mediaName!")
SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
ArchiveDatabaseExecutor.runBlocking {
SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
}
return Result.success()
}
if (!attachment.hadIntegrityCheckPerformed()) {
Log.w(TAG, "$attachmentId has no integrity check! Cannot proceed.")
SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
ArchiveDatabaseExecutor.runBlocking {
SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
}
return Result.success()
}
if (SignalDatabase.messages.isStory(attachment.mmsId)) {
Log.w(TAG, "$attachmentId is a story. Skipping.")
SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
ArchiveDatabaseExecutor.runBlocking {
SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
}
return Result.success()
}
@@ -150,12 +163,16 @@ class ArchiveThumbnailUploadJob private constructor(
val thumbnailResult = generateThumbnailIfPossible(attachment)
if (thumbnailResult == null) {
Log.w(TAG, "Unable to generate a thumbnail result for $attachmentId")
SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.PERMANENT_FAILURE)
ArchiveDatabaseExecutor.runBlocking {
SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.PERMANENT_FAILURE)
}
return Result.success()
}
if (isCanceled) {
SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE)
ArchiveDatabaseExecutor.runBlocking {
SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE)
}
return Result.failure()
}
@@ -172,7 +189,9 @@ class ArchiveThumbnailUploadJob private constructor(
}
if (isCanceled) {
SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE)
ArchiveDatabaseExecutor.runBlocking {
SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE)
}
return Result.failure()
}
@@ -199,7 +218,9 @@ class ArchiveThumbnailUploadJob private constructor(
}
if (isCanceled) {
SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE)
ArchiveDatabaseExecutor.runBlocking {
SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE)
}
return Result.failure()
}
@@ -220,15 +241,17 @@ class ArchiveThumbnailUploadJob private constructor(
return when (val result = BackupRepository.copyThumbnailToArchive(attachmentPointer, attachment)) {
is NetworkResult.Success -> {
// save attachment thumbnail
SignalDatabase.attachments.finalizeAttachmentThumbnailAfterUpload(
attachmentId = attachmentId,
attachmentPlaintextHash = attachment.dataHash,
attachmentRemoteKey = attachment.remoteKey,
data = thumbnailResult.data
)
ArchiveDatabaseExecutor.runBlocking {
SignalDatabase.attachments.finalizeAttachmentThumbnailAfterUpload(
attachmentId = attachmentId,
attachmentPlaintextHash = attachment.dataHash,
attachmentRemoteKey = attachment.remoteKey,
data = thumbnailResult.data
)
Log.d(TAG, "Successfully archived thumbnail for $attachmentId")
SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.FINISHED)
Log.d(TAG, "Successfully archived thumbnail for $attachmentId")
SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.FINISHED)
}
Result.success()
}
@@ -249,10 +272,14 @@ class ArchiveThumbnailUploadJob private constructor(
override fun onFailure() {
if (this.isCanceled) {
Log.w(TAG, "[$attachmentId] Job was canceled, updating archive thumbnail transfer state to ${AttachmentTable.ArchiveTransferState.NONE}.")
SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
ArchiveDatabaseExecutor.runBlocking {
SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
}
} else {
Log.w(TAG, "[$attachmentId] Job failed, updating archive thumbnail transfer state to ${AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE} (if not already a permanent failure).")
SignalDatabase.attachments.setArchiveThumbnailTransferStateFailure(attachmentId, AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE)
ArchiveDatabaseExecutor.runBlocking {
SignalDatabase.attachments.setArchiveThumbnailTransferStateFailure(attachmentId, AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE)
}
}
}

View File

@@ -10,6 +10,7 @@ import org.thoughtcrime.securesms.attachments.AttachmentId
import org.thoughtcrime.securesms.attachments.Cdn
import org.thoughtcrime.securesms.attachments.DatabaseAttachment
import org.thoughtcrime.securesms.backup.ArchiveUploadProgress
import org.thoughtcrime.securesms.backup.v2.ArchiveDatabaseExecutor
import org.thoughtcrime.securesms.backup.v2.BackupRepository
import org.thoughtcrime.securesms.database.AttachmentTable
import org.thoughtcrime.securesms.database.SignalDatabase
@@ -65,14 +66,14 @@ class CopyAttachmentToArchiveJob private constructor(private val attachmentId: A
if (transferStatus == AttachmentTable.ArchiveTransferState.NONE || transferStatus == AttachmentTable.ArchiveTransferState.UPLOAD_IN_PROGRESS) {
Log.d(TAG, "[$attachmentId] Updating archive transfer state to ${AttachmentTable.ArchiveTransferState.COPY_PENDING}")
SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.COPY_PENDING)
setArchiveTransferStateWithDelayedNotification(attachmentId, AttachmentTable.ArchiveTransferState.COPY_PENDING)
}
}
override fun run(): Result {
if (SignalStore.account.isLinkedDevice) {
Log.w(TAG, "[$attachmentId] Linked devices don't backup media. Skipping.")
SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
setArchiveTransferStateWithDelayedNotification(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
return Result.success()
}
@@ -100,25 +101,25 @@ class CopyAttachmentToArchiveJob private constructor(private val attachmentId: A
if (SignalDatabase.messages.isStory(attachment.mmsId)) {
Log.i(TAG, "[$attachmentId] Attachment is a story. Resetting transfer state to none and skipping.")
SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
setArchiveTransferStateWithDelayedNotification(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
return Result.success()
}
if (SignalDatabase.messages.isViewOnce(attachment.mmsId)) {
Log.i(TAG, "[$attachmentId] Attachment is view-once. Resetting transfer state to none and skipping.")
SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
setArchiveTransferStateWithDelayedNotification(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
return Result.success()
}
if (SignalDatabase.messages.willMessageExpireBeforeCutoff(attachment.mmsId)) {
Log.i(TAG, "[$attachmentId] Message will expire in less than 24 hours. Resetting transfer state to none and skipping.")
SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
setArchiveTransferStateWithDelayedNotification(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
return Result.success()
}
if (attachment.contentType == MediaUtil.LONG_TEXT) {
Log.i(TAG, "[$attachmentId] Attachment is long text. Resetting transfer state to none and skipping.")
SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
setArchiveTransferStateWithDelayedNotification(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
return Result.success()
}
@@ -148,7 +149,7 @@ class CopyAttachmentToArchiveJob private constructor(private val attachmentId: A
when (archiveResult.code) {
400 -> {
Log.w(TAG, "[$attachmentId] Something is invalid about our request. Possibly the length. Scheduling a re-upload. Body: ${archiveResult.exception.stringBody}")
SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
setArchiveTransferStateWithDelayedNotification(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
AppDependencies.jobManager.add(UploadAttachmentToArchiveJob(attachmentId, canReuseUpload = false))
Result.success()
}
@@ -158,7 +159,7 @@ class CopyAttachmentToArchiveJob private constructor(private val attachmentId: A
}
410 -> {
Log.w(TAG, "[$attachmentId] The attachment no longer exists on the transit tier. Scheduling a re-upload.")
SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
setArchiveTransferStateWithDelayedNotification(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
AppDependencies.jobManager.add(UploadAttachmentToArchiveJob(attachmentId, canReuseUpload = false))
Result.success()
}
@@ -197,7 +198,10 @@ class CopyAttachmentToArchiveJob private constructor(private val attachmentId: A
if (result.isSuccess) {
Log.d(TAG, "[$attachmentId] Updating archive transfer state to ${AttachmentTable.ArchiveTransferState.FINISHED}")
SignalDatabase.attachments.setArchiveTransferState(attachmentId, attachment.remoteKey!!, attachment.dataHash!!, AttachmentTable.ArchiveTransferState.FINISHED)
ArchiveDatabaseExecutor.runBlocking {
SignalDatabase.attachments.setArchiveTransferState(attachmentId, attachment.remoteKey!!, attachment.dataHash!!, AttachmentTable.ArchiveTransferState.FINISHED, notify = false)
ArchiveDatabaseExecutor.throttledNotifyAttachmentObservers()
}
if (!isCanceled && !attachment.quote) {
ArchiveThumbnailUploadJob.enqueueIfNecessary(attachmentId)
@@ -221,15 +225,26 @@ class CopyAttachmentToArchiveJob private constructor(private val attachmentId: A
override fun onFailure() {
if (this.isCanceled) {
Log.w(TAG, "[$attachmentId] Job was canceled, updating archive transfer state to ${AttachmentTable.ArchiveTransferState.COPY_PENDING}.")
SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.COPY_PENDING)
ArchiveDatabaseExecutor.runBlocking {
SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.COPY_PENDING)
}
} else {
Log.w(TAG, "[$attachmentId] Job failed, updating archive transfer state to ${AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE}.")
SignalDatabase.attachments.setArchiveTransferStateFailure(attachmentId, AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE)
ArchiveDatabaseExecutor.runBlocking {
SignalDatabase.attachments.setArchiveTransferStateFailure(attachmentId, AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE)
}
}
ArchiveUploadProgress.onAttachmentFinished(attachmentId)
}
private fun setArchiveTransferStateWithDelayedNotification(attachmentId: AttachmentId, transferState: AttachmentTable.ArchiveTransferState) {
ArchiveDatabaseExecutor.runBlocking {
SignalDatabase.attachments.setArchiveTransferState(attachmentId, transferState, notify = false)
ArchiveDatabaseExecutor.throttledNotifyAttachmentObservers()
}
}
class Factory : Job.Factory<CopyAttachmentToArchiveJob> {
override fun create(parameters: Parameters, serializedData: ByteArray?): CopyAttachmentToArchiveJob {
val jobData = CopyAttachmentToArchiveJobData.ADAPTER.decode(serializedData!!)

View File

@@ -15,8 +15,6 @@ import androidx.core.content.ContextCompat
import org.greenrobot.eventbus.EventBus
import org.signal.core.util.Base64.decodeBase64OrThrow
import org.signal.core.util.PendingIntentFlags
import org.signal.core.util.ThreadUtil
import org.signal.core.util.concurrent.SignalExecutors
import org.signal.core.util.isNotNullOrBlank
import org.signal.core.util.logging.Log
import org.signal.libsignal.protocol.InvalidMacException
@@ -25,6 +23,7 @@ import org.thoughtcrime.securesms.R
import org.thoughtcrime.securesms.attachments.AttachmentId
import org.thoughtcrime.securesms.attachments.DatabaseAttachment
import org.thoughtcrime.securesms.attachments.InvalidAttachmentException
import org.thoughtcrime.securesms.backup.v2.ArchiveDatabaseExecutor
import org.thoughtcrime.securesms.backup.v2.ArchiveRestoreProgress
import org.thoughtcrime.securesms.backup.v2.BackupRepository
import org.thoughtcrime.securesms.backup.v2.createArchiveAttachmentPointer
@@ -53,7 +52,6 @@ import org.thoughtcrime.securesms.stickers.StickerLocator
import org.thoughtcrime.securesms.transport.RetryLaterException
import org.thoughtcrime.securesms.util.RemoteConfig
import org.thoughtcrime.securesms.util.SignalLocalMetrics
import org.thoughtcrime.securesms.util.ThrottledDebouncer
import org.whispersystems.signalservice.api.crypto.AttachmentCipherInputStream.IntegrityCheck
import org.whispersystems.signalservice.api.messages.AttachmentTransferProgress
import org.whispersystems.signalservice.api.messages.SignalServiceAttachment
@@ -63,9 +61,6 @@ import org.whispersystems.signalservice.api.push.exceptions.PushNetworkException
import org.whispersystems.signalservice.api.push.exceptions.RangeException
import java.io.File
import java.io.IOException
import java.util.concurrent.ExecutionException
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import kotlin.jvm.optionals.getOrNull
import kotlin.math.abs
@@ -74,7 +69,6 @@ import kotlin.math.pow
import kotlin.time.Duration.Companion.days
import kotlin.time.Duration.Companion.hours
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds
/**
* Download attachment from locations as specified in their record.
@@ -129,23 +123,6 @@ class RestoreAttachmentJob private constructor(
const val KEY = "RestoreAttachmentJob"
private val TAG = Log.tag(RestoreAttachmentJob::class.java)
/**
* During media restore, we tend to hammer the database from a lot of different threads at once. This can block writes for more urgent things, like message
* sends. To reduce the impact, we put all of our database writes on a single-thread executor.
*/
private val DB_EXECUTOR = Executors.newSingleThreadExecutor(SignalExecutors.NumberedThreadFactory("restore-db", ThreadUtil.PRIORITY_IMPORTANT_BACKGROUND_THREAD))
/**
* By default, downloading an attachment wants to notify a bunch of database observation listeners. This slams the observer so hard that other people using
* it will experience massive delays in notifications. To avoid this, we turn off notifications for downloads, and then use this notifier to push some
* out every so often.
*/
val DATABASE_OBSERVER_NOTIFIER = ThrottledDebouncer(5.seconds.inWholeMilliseconds)
val NOTIFY_DATABASE_OBSERVERS = {
AppDependencies.databaseObserver.notifyConversationListListeners()
AppDependencies.databaseObserver.notifyAttachmentUpdatedObservers()
}
/**
* Create a restore job for the initial large batch of media on a fresh restore.
* Will enqueue with some amount of parallelization with low job priority.
@@ -230,7 +207,7 @@ class RestoreAttachmentJob private constructor(
}
override fun onAdded() {
DB_EXECUTOR.runBlocking {
ArchiveDatabaseExecutor.runBlocking {
SignalDatabase.attachments.setRestoreTransferState(attachmentId, AttachmentTable.TRANSFER_RESTORE_IN_PROGRESS)
}
}
@@ -287,9 +264,9 @@ class RestoreAttachmentJob private constructor(
dataStream?.use { input ->
Log.i(TAG, "[$attachmentId] Attachment is sticker, restoring from local storage")
DB_EXECUTOR.runBlocking {
ArchiveDatabaseExecutor.runBlocking {
SignalDatabase.attachments.finalizeAttachmentAfterDownload(messageId, attachmentId, input, if (manual) System.currentTimeMillis().milliseconds else null, notify = false)
DATABASE_OBSERVER_NOTIFIER.publish(NOTIFY_DATABASE_OBSERVERS)
ArchiveDatabaseExecutor.throttledNotifyAttachmentAndChatListObservers()
}
return
}
@@ -316,7 +293,7 @@ class RestoreAttachmentJob private constructor(
override fun onFailure() {
if (isCanceled) {
DB_EXECUTOR.runBlocking {
ArchiveDatabaseExecutor.runBlocking {
SignalDatabase.attachments.setTransferState(messageId, attachmentId, AttachmentTable.TRANSFER_RESTORE_OFFLOADED)
}
} else {
@@ -359,7 +336,7 @@ class RestoreAttachmentJob private constructor(
forceTransitTier: Boolean = false
) {
val maxReceiveSize: Long = RemoteConfig.maxAttachmentReceiveSizeBytes
val attachmentFile: File = DB_EXECUTOR.runBlocking {
val attachmentFile: File = ArchiveDatabaseExecutor.runBlocking {
SignalDatabase.attachments.getOrCreateTransferFile(attachmentId)
}
var useArchiveCdn = false
@@ -435,11 +412,11 @@ class RestoreAttachmentJob private constructor(
archiveRestore = true,
notify = false
)
DATABASE_OBSERVER_NOTIFIER.publish(NOTIFY_DATABASE_OBSERVERS)
ArchiveDatabaseExecutor.throttledNotifyAttachmentAndChatListObservers()
}
if (useArchiveCdn && attachment.archiveCdn == null) {
DB_EXECUTOR.runBlocking {
ArchiveDatabaseExecutor.runBlocking {
SignalDatabase.attachments.setArchiveCdn(attachmentId, pointer.cdnNumber)
}
}
@@ -510,7 +487,7 @@ class RestoreAttachmentJob private constructor(
}
} catch (e: org.signal.libsignal.protocol.incrementalmac.InvalidMacException) {
Log.w(TAG, "[$attachmentId] Detected an invalid incremental mac. Clearing and marking as a temporary failure, requiring the user to manually try again.")
DB_EXECUTOR.runBlocking {
ArchiveDatabaseExecutor.runBlocking {
SignalDatabase.attachments.clearIncrementalMacsForAttachmentAndAnyDuplicates(attachmentId, attachment.remoteKey, attachment.dataHash)
}
markFailed(attachmentId)
@@ -520,13 +497,13 @@ class RestoreAttachmentJob private constructor(
}
private fun markFailed(attachmentId: AttachmentId) {
DB_EXECUTOR.runBlocking {
ArchiveDatabaseExecutor.runBlocking {
SignalDatabase.attachments.setRestoreTransferState(attachmentId, AttachmentTable.TRANSFER_PROGRESS_FAILED)
}
}
private fun markPermanentlyFailed(attachmentId: AttachmentId) {
DB_EXECUTOR.runBlocking {
ArchiveDatabaseExecutor.runBlocking {
SignalDatabase.attachments.setRestoreTransferState(attachmentId, AttachmentTable.TRANSFER_PROGRESS_PERMANENT_FAILURE)
}
}
@@ -571,14 +548,6 @@ class RestoreAttachmentJob private constructor(
NotificationManagerCompat.from(context).notify(NotificationIds.INTERNAL_ERROR, notification)
}
private fun <T> ExecutorService.runBlocking(block: () -> T): T {
return try {
this.submit(block).get()
} catch (e: ExecutionException) {
throw e.cause ?: e
}
}
class Factory : Job.Factory<RestoreAttachmentJob?> {
override fun create(parameters: Parameters, serializedData: ByteArray?): RestoreAttachmentJob {
val data = RestoreAttachmentJobData.ADAPTER.decode(serializedData!!)

View File

@@ -8,6 +8,7 @@ import org.signal.core.util.logging.Log
import org.signal.libsignal.protocol.InvalidMessageException
import org.thoughtcrime.securesms.attachments.AttachmentId
import org.thoughtcrime.securesms.attachments.InvalidAttachmentException
import org.thoughtcrime.securesms.backup.v2.ArchiveDatabaseExecutor
import org.thoughtcrime.securesms.backup.v2.BackupRepository
import org.thoughtcrime.securesms.backup.v2.createArchiveThumbnailPointer
import org.thoughtcrime.securesms.backup.v2.requireThumbnailMediaName
@@ -141,7 +142,9 @@ class RestoreAttachmentThumbnailJob private constructor(
)
decryptingStream.use { input ->
SignalDatabase.attachments.finalizeAttachmentThumbnailAfterDownload(attachmentId, attachment.dataHash, attachment.remoteKey, input, thumbnailTransferFile)
ArchiveDatabaseExecutor.runBlocking {
SignalDatabase.attachments.finalizeAttachmentThumbnailAfterDownload(attachmentId, attachment.dataHash, attachment.remoteKey, input, thumbnailTransferFile)
}
}
if (!SignalDatabase.messages.isStory(messageId)) {
@@ -152,7 +155,9 @@ class RestoreAttachmentThumbnailJob private constructor(
override fun onFailure() {
Log.w(TAG, format(this, "onFailure() thumbnail messageId: $messageId attachmentId: $attachmentId "))
SignalDatabase.attachments.setThumbnailRestoreProgressFailed(attachmentId, messageId)
ArchiveDatabaseExecutor.runBlocking {
SignalDatabase.attachments.setThumbnailRestoreProgressFailed(attachmentId, messageId)
}
}
override fun onShouldRetry(exception: Exception): Boolean {

View File

@@ -16,6 +16,7 @@ import org.thoughtcrime.securesms.attachments.AttachmentId
import org.thoughtcrime.securesms.attachments.AttachmentUploadUtil
import org.thoughtcrime.securesms.attachments.DatabaseAttachment
import org.thoughtcrime.securesms.backup.ArchiveUploadProgress
import org.thoughtcrime.securesms.backup.v2.ArchiveDatabaseExecutor
import org.thoughtcrime.securesms.backup.v2.BackupRepository
import org.thoughtcrime.securesms.database.AttachmentTable
import org.thoughtcrime.securesms.database.SignalDatabase
@@ -97,7 +98,9 @@ class UploadAttachmentToArchiveJob private constructor(
if (transferStatus == AttachmentTable.ArchiveTransferState.NONE) {
Log.d(TAG, "[$attachmentId] Updating archive transfer state to ${AttachmentTable.ArchiveTransferState.UPLOAD_IN_PROGRESS}")
SignalDatabase.attachments.setArchiveTransferStateUnlessPermanentFailure(attachmentId, AttachmentTable.ArchiveTransferState.UPLOAD_IN_PROGRESS)
ArchiveDatabaseExecutor.runBlocking {
SignalDatabase.attachments.setArchiveTransferStateUnlessPermanentFailure(attachmentId, AttachmentTable.ArchiveTransferState.UPLOAD_IN_PROGRESS)
}
}
}
@@ -109,13 +112,17 @@ class UploadAttachmentToArchiveJob private constructor(
if (SignalStore.account.isLinkedDevice) {
Log.w(TAG, "[$attachmentId] Linked devices don't backup media. Skipping.")
SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
ArchiveDatabaseExecutor.runBlocking {
setArchiveTransferStateWithDelayedNotification(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
}
return Result.success()
}
if (!SignalStore.backup.backsUpMedia) {
Log.w(TAG, "[$attachmentId] This user does not back up media. Skipping.")
SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
ArchiveDatabaseExecutor.runBlocking {
setArchiveTransferStateWithDelayedNotification(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
}
return Result.success()
}
@@ -149,25 +156,33 @@ class UploadAttachmentToArchiveJob private constructor(
if (SignalDatabase.messages.isStory(attachment.mmsId)) {
Log.i(TAG, "[$attachmentId] Attachment is a story. Resetting transfer state to none and skipping.")
SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
ArchiveDatabaseExecutor.runBlocking {
setArchiveTransferStateWithDelayedNotification(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
}
return Result.success()
}
if (SignalDatabase.messages.isViewOnce(attachment.mmsId)) {
Log.i(TAG, "[$attachmentId] Attachment is a view-once. Resetting transfer state to none and skipping.")
SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
ArchiveDatabaseExecutor.runBlocking {
setArchiveTransferStateWithDelayedNotification(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
}
return Result.success()
}
if (SignalDatabase.messages.willMessageExpireBeforeCutoff(attachment.mmsId)) {
Log.i(TAG, "[$attachmentId] Message will expire within 24 hours. Resetting transfer state to none and skipping.")
SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
ArchiveDatabaseExecutor.runBlocking {
setArchiveTransferStateWithDelayedNotification(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
}
return Result.success()
}
if (attachment.contentType == MediaUtil.LONG_TEXT) {
Log.i(TAG, "[$attachmentId] Attachment is long text. Resetting transfer state to none and skipping.")
SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
ArchiveDatabaseExecutor.runBlocking {
setArchiveTransferStateWithDelayedNotification(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
}
return Result.success()
}
@@ -226,7 +241,9 @@ class UploadAttachmentToArchiveJob private constructor(
)
} catch (e: FileNotFoundException) {
Log.w(TAG, "[$attachmentId] No file exists for this attachment! Marking as a permanent failure.", e)
SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.PERMANENT_FAILURE)
ArchiveDatabaseExecutor.runBlocking {
setArchiveTransferStateWithDelayedNotification(attachmentId, AttachmentTable.ArchiveTransferState.PERMANENT_FAILURE)
}
return Result.failure()
} catch (e: IOException) {
Log.w(TAG, "[$attachmentId] Failed while reading the stream.", e)
@@ -249,7 +266,9 @@ class UploadAttachmentToArchiveJob private constructor(
.use { it.readLength() }
if (actualLength != attachment.size) {
Log.w(TAG, "[$attachmentId] Length was incorrect! Will update. Previous: ${attachment.size}, Newly-Calculated: $actualLength", result.exception)
SignalDatabase.attachments.updateAttachmentLength(attachmentId, actualLength)
ArchiveDatabaseExecutor.runBlocking {
SignalDatabase.attachments.updateAttachmentLength(attachmentId, actualLength)
}
} else {
Log.i(TAG, "[$attachmentId] Length was correct. No action needed. Will retry.")
}
@@ -272,7 +291,9 @@ class UploadAttachmentToArchiveJob private constructor(
}
Log.d(TAG, "[$attachmentId] Upload complete!")
SignalDatabase.attachments.finalizeAttachmentAfterUpload(attachment.attachmentId, uploadResult)
ArchiveDatabaseExecutor.runBlocking {
SignalDatabase.attachments.finalizeAttachmentAfterUpload(attachment.attachmentId, uploadResult)
}
}
if (!isCanceled) {
@@ -287,10 +308,14 @@ class UploadAttachmentToArchiveJob private constructor(
override fun onFailure() {
if (this.isCanceled) {
Log.w(TAG, "[$attachmentId] Job was canceled, updating archive transfer state to ${AttachmentTable.ArchiveTransferState.NONE}.")
SignalDatabase.attachments.setArchiveTransferStateFailure(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
ArchiveDatabaseExecutor.runBlocking {
SignalDatabase.attachments.setArchiveTransferStateFailure(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
}
} else {
Log.w(TAG, "[$attachmentId] Job failed, updating archive transfer state to ${AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE} (if not already a permanent failure).")
SignalDatabase.attachments.setArchiveTransferStateFailure(attachmentId, AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE)
ArchiveDatabaseExecutor.runBlocking {
SignalDatabase.attachments.setArchiveTransferStateFailure(attachmentId, AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE)
}
}
}
@@ -334,6 +359,13 @@ class UploadAttachmentToArchiveJob private constructor(
}
}
private fun setArchiveTransferStateWithDelayedNotification(attachmentId: AttachmentId, transferState: AttachmentTable.ArchiveTransferState) {
ArchiveDatabaseExecutor.runBlocking {
SignalDatabase.attachments.setArchiveTransferState(attachmentId, transferState, notify = false)
ArchiveDatabaseExecutor.throttledNotifyAttachmentObservers()
}
}
class Factory : Job.Factory<UploadAttachmentToArchiveJob> {
override fun create(parameters: Parameters, serializedData: ByteArray?): UploadAttachmentToArchiveJob {
val data = UploadAttachmentToArchiveJobData.ADAPTER.decode(serializedData!!)