diff --git a/app/src/main/java/org/thoughtcrime/securesms/backup/v2/BackupRepository.kt b/app/src/main/java/org/thoughtcrime/securesms/backup/v2/BackupRepository.kt index ecaa627229..e773119970 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/backup/v2/BackupRepository.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/backup/v2/BackupRepository.kt @@ -219,7 +219,7 @@ object BackupRepository { } } - fun export(outputStream: OutputStream, append: (ByteArray) -> Unit, plaintext: Boolean = false, currentTime: Long = System.currentTimeMillis()) { + fun export(outputStream: OutputStream, append: (ByteArray) -> Unit, plaintext: Boolean = false, currentTime: Long = System.currentTimeMillis(), cancellationSignal: () -> Boolean = { false }) { val writer: BackupExportWriter = if (plaintext) { PlainTextBackupWriter(outputStream) } else { @@ -231,7 +231,7 @@ object BackupRepository { ) } - export(currentTime = currentTime, isLocal = false, writer = writer) + export(currentTime = currentTime, isLocal = false, writer = writer, cancellationSignal = cancellationSignal) } /** @@ -248,6 +248,7 @@ object BackupRepository { isLocal: Boolean, writer: BackupExportWriter, progressEmitter: ExportProgressListener? = null, + cancellationSignal: () -> Boolean = { false }, exportExtras: ((SignalDatabase) -> Unit)? = null ) { val eventTimer = EventTimer() @@ -260,6 +261,8 @@ object BackupRepository { val exportState = ExportState(backupTime = currentTime, mediaBackupEnabled = SignalStore.backup.backsUpMedia) + var frameCount = 0L + writer.use { writer.write( BackupInfo( @@ -267,6 +270,7 @@ object BackupRepository { backupTimeMs = exportState.backupTime ) ) + frameCount++ // We're using a snapshot, so the transaction is more for perf than correctness dbSnapshot.rawWritableDatabase.withinTransaction { @@ -274,43 +278,64 @@ object BackupRepository { AccountDataArchiveProcessor.export(dbSnapshot, signalStoreSnapshot) { writer.write(it) eventTimer.emit("account") + frameCount++ + } + if (cancellationSignal()) { + Log.w(TAG, "[import] Cancelled! Stopping") + return@export } progressEmitter?.onRecipient() RecipientArchiveProcessor.export(dbSnapshot, signalStoreSnapshot, exportState) { writer.write(it) eventTimer.emit("recipient") + frameCount++ + } + if (cancellationSignal()) { + Log.w(TAG, "[import] Cancelled! Stopping") + return@export } progressEmitter?.onThread() ChatArchiveProcessor.export(dbSnapshot, exportState) { frame -> writer.write(frame) eventTimer.emit("thread") + frameCount++ + } + if (cancellationSignal()) { + return@export } progressEmitter?.onCall() AdHocCallArchiveProcessor.export(dbSnapshot) { frame -> writer.write(frame) eventTimer.emit("call") + frameCount++ } progressEmitter?.onSticker() StickerArchiveProcessor.export(dbSnapshot) { frame -> writer.write(frame) eventTimer.emit("sticker-pack") + frameCount++ } progressEmitter?.onMessage() - ChatItemArchiveProcessor.export(dbSnapshot, exportState) { frame -> + ChatItemArchiveProcessor.export(dbSnapshot, exportState, cancellationSignal) { frame -> writer.write(frame) eventTimer.emit("message") + frameCount++ + + if (frameCount % 1000 == 0L) { + Log.d(TAG, "[export] Exported $frameCount frames so far.") + } } } } exportExtras?.invoke(dbSnapshot) - Log.d(TAG, "export() ${eventTimer.stop().summary}") + Log.d(TAG, "[export] totalFrames: $frameCount | ${eventTimer.stop().summary}") } finally { deleteDatabaseSnapshot(mainDbName) deleteDatabaseSnapshot(keyValueDbName) diff --git a/app/src/main/java/org/thoughtcrime/securesms/backup/v2/processor/ChatItemArchiveProcessor.kt b/app/src/main/java/org/thoughtcrime/securesms/backup/v2/processor/ChatItemArchiveProcessor.kt index 157a2b9bf2..402660573f 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/backup/v2/processor/ChatItemArchiveProcessor.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/backup/v2/processor/ChatItemArchiveProcessor.kt @@ -22,15 +22,21 @@ import org.thoughtcrime.securesms.database.SignalDatabase object ChatItemArchiveProcessor { val TAG = Log.tag(ChatItemArchiveProcessor::class.java) - fun export(db: SignalDatabase, exportState: ExportState, emitter: BackupFrameEmitter) { + fun export(db: SignalDatabase, exportState: ExportState, cancellationSignal: () -> Boolean, emitter: BackupFrameEmitter) { db.messageTable.getMessagesForBackup(db, exportState.backupTime, exportState.mediaBackupEnabled).use { chatItems -> + var count = 0 while (chatItems.hasNext()) { + if (count % 1000 == 0 && cancellationSignal()) { + return@use + } + val chatItem: ChatItem? = chatItems.next() if (chatItem != null) { if (exportState.threadIds.contains(chatItem.chatId)) { emitter.emit(Frame(chatItem = chatItem)) } } + count++ } } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/components/settings/app/internal/backup/InternalBackupPlaygroundFragment.kt b/app/src/main/java/org/thoughtcrime/securesms/components/settings/app/internal/backup/InternalBackupPlaygroundFragment.kt index 3a14f2ffeb..69f35ee7d8 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/components/settings/app/internal/backup/InternalBackupPlaygroundFragment.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/components/settings/app/internal/backup/InternalBackupPlaygroundFragment.kt @@ -29,6 +29,7 @@ import androidx.compose.foundation.rememberScrollState import androidx.compose.foundation.shape.RoundedCornerShape import androidx.compose.foundation.verticalScroll import androidx.compose.material3.Button +import androidx.compose.material3.ButtonDefaults import androidx.compose.material3.Checkbox import androidx.compose.material3.CircularProgressIndicator import androidx.compose.material3.DropdownMenu @@ -56,6 +57,7 @@ import androidx.compose.runtime.remember import androidx.compose.runtime.setValue import androidx.compose.ui.Alignment import androidx.compose.ui.Modifier +import androidx.compose.ui.graphics.Color import androidx.compose.ui.platform.LocalContext import androidx.compose.ui.res.painterResource import androidx.compose.ui.text.font.FontWeight @@ -185,7 +187,8 @@ class InternalBackupPlaygroundFragment : ComposeFragment() { .setPositiveButton("Wipe and restore") { _, _ -> viewModel.wipeAllDataAndRestoreFromRemote() } .show() }, - onBackupTierSelected = { tier -> viewModel.onBackupTierSelected(tier) } + onBackupTierSelected = { tier -> viewModel.onBackupTierSelected(tier) }, + onHaltAllJobs = { viewModel.haltAllJobs() } ) }, mediaContent = { snackbarHostState -> @@ -279,7 +282,8 @@ fun Screen( onCheckRemoteBackupStateClicked: () -> Unit = {}, onTriggerBackupJobClicked: () -> Unit = {}, onWipeDataAndRestoreClicked: () -> Unit = {}, - onBackupTierSelected: (MessageBackupTier?) -> Unit = {} + onBackupTierSelected: (MessageBackupTier?) -> Unit = {}, + onHaltAllJobs: () -> Unit = {} ) { val scrollState = rememberScrollState() val options = remember { @@ -320,12 +324,19 @@ fun Screen( Text("Enqueue remote backup") } - Buttons.LargeTonal( - onClick = onWipeDataAndRestoreClicked + Button( + onClick = onWipeDataAndRestoreClicked, + colors = ButtonDefaults.buttonColors(containerColor = Color(0xFFC33C00)) ) { Text("Wipe data and restore") } + Buttons.LargeTonal( + onClick = onHaltAllJobs + ) { + Text("Halt all backup jobs") + } + Dividers.Default() Row( diff --git a/app/src/main/java/org/thoughtcrime/securesms/components/settings/app/internal/backup/InternalBackupPlaygroundViewModel.kt b/app/src/main/java/org/thoughtcrime/securesms/components/settings/app/internal/backup/InternalBackupPlaygroundViewModel.kt index 8c7ffab495..59de6bf516 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/components/settings/app/internal/backup/InternalBackupPlaygroundViewModel.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/components/settings/app/internal/backup/InternalBackupPlaygroundViewModel.kt @@ -34,6 +34,7 @@ import org.thoughtcrime.securesms.database.MessageType import org.thoughtcrime.securesms.database.SignalDatabase import org.thoughtcrime.securesms.dependencies.AppDependencies import org.thoughtcrime.securesms.jobs.AttachmentUploadJob +import org.thoughtcrime.securesms.jobs.BackfillDigestJob import org.thoughtcrime.securesms.jobs.BackupMessagesJob import org.thoughtcrime.securesms.jobs.BackupRestoreJob import org.thoughtcrime.securesms.jobs.BackupRestoreMediaJob @@ -514,4 +515,11 @@ class InternalBackupPlaygroundViewModel : ViewModel() { fun MutableState.set(update: T.() -> T) { this.value = this.value.update() } + + fun haltAllJobs() { + AppDependencies.jobManager.cancelAllInQueue(BackfillDigestJob.QUEUE) + AppDependencies.jobManager.cancelAllInQueue("ArchiveAttachmentJobs_0") + AppDependencies.jobManager.cancelAllInQueue("ArchiveAttachmentJobs_1") + AppDependencies.jobManager.cancelAllInQueue("ArchiveThumbnailUploadJob") + } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt b/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt index dc74ae4631..1e93d42861 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt @@ -1068,6 +1068,23 @@ class AttachmentTable( } } + /** + * Updates the attachment (and all attachments that share the same data file) with a new length. + */ + fun updateAttachmentLength(attachmentId: AttachmentId, length: Long) { + val dataFile = getDataFileInfo(attachmentId) + if (dataFile == null) { + Log.w(TAG, "[$attachmentId] Failed to find data file!") + return + } + + writableDatabase + .update(TABLE_NAME) + .values(DATA_SIZE to length) + .where("$DATA_FILE = ?", dataFile.file.absolutePath) + .run() + } + /** * When we find out about a new inbound attachment pointer, we insert a row for it that contains all the info we need to download it via [insertAttachmentWithData]. * Later, we download the data for that pointer. Call this method once you have the data to associate it with the attachment. At this point, it is assumed @@ -2804,7 +2821,10 @@ class AttachmentTable( FINISHED(3), /** It is impossible to upload this attachment. */ - PERMANENT_FAILURE(4); + PERMANENT_FAILURE(4), + + /** Upload failed, but in a way where it may be worth retrying later. */ + TEMPORARY_FAILURE(5); companion object { fun deserialize(value: Int): ArchiveTransferState { diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupMessagesJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupMessagesJob.kt index 0bcc0f7a69..68d3eaf8d1 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupMessagesJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupMessagesJob.kt @@ -77,9 +77,13 @@ class BackupMessagesJob private constructor(parameters: Parameters) : Job(parame val tempBackupFile = BlobProvider.getInstance().forNonAutoEncryptingSingleSessionOnDisk(AppDependencies.application) val outputStream = FileOutputStream(tempBackupFile) - BackupRepository.export(outputStream = outputStream, append = { tempBackupFile.appendBytes(it) }, plaintext = false) + BackupRepository.export(outputStream = outputStream, append = { tempBackupFile.appendBytes(it) }, plaintext = false, cancellationSignal = { this.isCanceled }) stopwatch.split("export") + if (isCanceled) { + return Result.failure() + } + ArchiveUploadProgress.onMessageBackupCreated() FileInputStream(tempBackupFile).use { diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/CopyAttachmentToArchiveJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/CopyAttachmentToArchiveJob.kt index f69ebc1a4f..62993e14ff 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/CopyAttachmentToArchiveJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/CopyAttachmentToArchiveJob.kt @@ -148,6 +148,14 @@ class CopyAttachmentToArchiveJob private constructor(private val attachmentId: A } override fun onFailure() { + if (this.isCanceled) { + Log.w(TAG, "[$attachmentId] Job was canceled, updating archive transfer state to ${AttachmentTable.ArchiveTransferState.COPY_PENDING}.") + SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.COPY_PENDING) + } else { + Log.w(TAG, "[$attachmentId] Job failed, updating archive transfer state to ${AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE}.") + SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE) + } + ArchiveUploadProgress.onAttachmentFinished() } diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt index 3097b881fb..5f5504137f 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt @@ -7,6 +7,7 @@ package org.thoughtcrime.securesms.jobs import org.signal.core.util.Base64 import org.signal.core.util.logging.Log +import org.signal.core.util.readLength import org.signal.protos.resumableuploads.ResumableUpload import org.thoughtcrime.securesms.attachments.AttachmentId import org.thoughtcrime.securesms.attachments.AttachmentUploadUtil @@ -24,6 +25,7 @@ import org.whispersystems.signalservice.api.NetworkResult import org.whispersystems.signalservice.api.archive.ArchiveMediaUploadFormStatusCodes import org.whispersystems.signalservice.api.attachment.AttachmentUploadResult import java.io.IOException +import java.net.ProtocolException import kotlin.random.Random import kotlin.time.Duration.Companion.days @@ -146,6 +148,19 @@ class UploadAttachmentToArchiveJob private constructor( is NetworkResult.ApplicationError -> throw result.throwable is NetworkResult.NetworkError -> { Log.w(TAG, "[$attachmentId] Failed to upload due to network error.", result.exception) + + if (result.exception.cause is ProtocolException) { + Log.w(TAG, "[$attachmentId] Length may be incorrect. Recalculating.", result.exception) + + val actualLength = SignalDatabase.attachments.getAttachmentStream(attachmentId, 0).readLength() + if (actualLength != attachment.size) { + Log.w(TAG, "[$attachmentId] Length was incorrect! Will update. Previous: ${attachment.size}, Newly-Calculated: $actualLength", result.exception) + SignalDatabase.attachments.updateAttachmentLength(attachmentId, actualLength) + } else { + Log.i(TAG, "[$attachmentId] Length was correct. No action needed. Will retry.") + } + } + return Result.retry(defaultBackoff()) } is NetworkResult.StatusCodeError -> { @@ -162,7 +177,15 @@ class UploadAttachmentToArchiveJob private constructor( return Result.success() } - override fun onFailure() = Unit + override fun onFailure() { + if (this.isCanceled) { + Log.w(TAG, "[$attachmentId] Job was canceled, updating archive transfer state to ${AttachmentTable.ArchiveTransferState.NONE}.") + SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE) + } else { + Log.w(TAG, "[$attachmentId] Job failed, updating archive transfer state to ${AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE}.") + SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE) + } + } private fun fetchResumableUploadSpec(key: ByteArray, iv: ByteArray): Pair { val uploadSpec = BackupRepository