Setup infra for better archive upload progress tracking.

This commit is contained in:
Greyson Parrelli
2024-09-19 10:38:05 -04:00
parent 1597ee70ba
commit 0e83e25e6e
15 changed files with 205 additions and 106 deletions

View File

@@ -5,13 +5,11 @@
package org.thoughtcrime.securesms.jobs
import org.greenrobot.eventbus.EventBus
import org.signal.core.util.logging.Log
import org.thoughtcrime.securesms.backup.v2.BackupV2Event
import org.thoughtcrime.securesms.backup.ArchiveUploadProgress
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.dependencies.AppDependencies
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.keyvalue.SignalStore
import kotlin.time.Duration.Companion.days
/**
@@ -40,14 +38,11 @@ class ArchiveAttachmentBackfillJob private constructor(parameters: Parameters) :
override fun run(): Result {
val jobs = SignalDatabase.attachments.getAttachmentsThatNeedArchiveUpload()
.map { attachmentId -> UploadAttachmentToArchiveJob(attachmentId, forBackfill = true) }
.map { attachmentId -> UploadAttachmentToArchiveJob(attachmentId) }
SignalDatabase.attachments.createKeyIvDigestForAttachmentsThatNeedArchiveUpload()
SignalStore.backup.totalAttachmentUploadCount = jobs.size.toLong()
SignalStore.backup.currentAttachmentUploadCount = 0
EventBus.getDefault().postSticky(BackupV2Event(BackupV2Event.Type.PROGRESS_ATTACHMENTS, count = 0, estimatedTotalCount = jobs.size.toLong()))
ArchiveUploadProgress.onAttachmentsStarted(jobs.size.toLong())
Log.i(TAG, "Adding ${jobs.size} jobs to backfill attachments.")
AppDependencies.jobManager.addAll(jobs)

View File

@@ -5,11 +5,10 @@
package org.thoughtcrime.securesms.jobs
import org.greenrobot.eventbus.EventBus
import org.signal.core.util.Stopwatch
import org.signal.core.util.logging.Log
import org.thoughtcrime.securesms.backup.ArchiveUploadProgress
import org.thoughtcrime.securesms.backup.v2.BackupRepository
import org.thoughtcrime.securesms.backup.v2.BackupV2Event
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.dependencies.AppDependencies
import org.thoughtcrime.securesms.jobmanager.Job
@@ -70,13 +69,15 @@ class BackupMessagesJob private constructor(parameters: Parameters) : Job(parame
SignalDatabase.attachments.createKeyIvDigestForAttachmentsThatNeedArchiveUpload().takeIf { it > 0 }?.let { count -> Log.w(TAG, "Needed to create $count key/iv/digests.") }
stopwatch.split("key-iv-digest")
EventBus.getDefault().postSticky(BackupV2Event(type = BackupV2Event.Type.PROGRESS_MESSAGES, count = 0, estimatedTotalCount = 0))
ArchiveUploadProgress.begin()
val tempBackupFile = BlobProvider.getInstance().forNonAutoEncryptingSingleSessionOnDisk(AppDependencies.application)
val outputStream = FileOutputStream(tempBackupFile)
BackupRepository.export(outputStream = outputStream, append = { tempBackupFile.appendBytes(it) }, plaintext = false)
stopwatch.split("export")
ArchiveUploadProgress.onMessageBackupCreated()
FileInputStream(tempBackupFile).use {
when (val result = BackupRepository.uploadBackupFile(it, tempBackupFile.length())) {
is NetworkResult.Success -> Log.i(TAG, "Successfully uploaded backup file.")
@@ -95,7 +96,7 @@ class BackupMessagesJob private constructor(parameters: Parameters) : Job(parame
SignalStore.backup.lastBackupTime = System.currentTimeMillis()
SignalStore.backup.usedBackupMediaSpace = when (val result = BackupRepository.getRemoteBackupUsedSpace()) {
is NetworkResult.Success -> result.result ?: 0
is NetworkResult.NetworkError -> SignalStore.backup.usedBackupMediaSpace // TODO enqueue a secondary job to fetch the latest number -- no need to fail this one
is NetworkResult.NetworkError -> SignalStore.backup.usedBackupMediaSpace // TODO [backup] enqueue a secondary job to fetch the latest number -- no need to fail this one
is NetworkResult.StatusCodeError -> {
Log.w(TAG, "Failed to get used space: ${result.code}")
SignalStore.backup.usedBackupMediaSpace
@@ -110,7 +111,7 @@ class BackupMessagesJob private constructor(parameters: Parameters) : Job(parame
AppDependencies.jobManager.add(ArchiveAttachmentBackfillJob())
} else {
Log.i(TAG, "No attachments need to be uploaded, we can finish.")
EventBus.getDefault().postSticky(BackupV2Event(BackupV2Event.Type.FINISHED, 0, 0))
ArchiveUploadProgress.onMessageBackupFinishedEarly()
}
return Result.success()

View File

@@ -1,12 +1,11 @@
package org.thoughtcrime.securesms.jobs
import org.greenrobot.eventbus.EventBus
import org.signal.core.util.logging.Log
import org.thoughtcrime.securesms.attachments.AttachmentId
import org.thoughtcrime.securesms.attachments.Cdn
import org.thoughtcrime.securesms.attachments.DatabaseAttachment
import org.thoughtcrime.securesms.backup.ArchiveUploadProgress
import org.thoughtcrime.securesms.backup.v2.BackupRepository
import org.thoughtcrime.securesms.backup.v2.BackupV2Event
import org.thoughtcrime.securesms.database.AttachmentTable
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.dependencies.AppDependencies
@@ -24,7 +23,7 @@ import java.util.concurrent.TimeUnit
* Copies and re-encrypts attachments from the attachment cdn to the archive cdn.
* If it's discovered that the attachment no longer exists on the attachment cdn, this job will schedule a re-upload via [UploadAttachmentToArchiveJob].
*/
class CopyAttachmentToArchiveJob private constructor(private val attachmentId: AttachmentId, private val forBackfill: Boolean, parameters: Parameters) : Job(parameters) {
class CopyAttachmentToArchiveJob private constructor(private val attachmentId: AttachmentId, parameters: Parameters) : Job(parameters) {
companion object {
private val TAG = Log.tag(CopyAttachmentToArchiveJob::class.java)
@@ -35,9 +34,8 @@ class CopyAttachmentToArchiveJob private constructor(private val attachmentId: A
val ALLOWED_SOURCE_CDNS = setOf(Cdn.CDN_2, Cdn.CDN_3)
}
constructor(attachmentId: AttachmentId, forBackfill: Boolean = false) : this(
constructor(attachmentId: AttachmentId) : this(
attachmentId = attachmentId,
forBackfill = forBackfill,
parameters = Parameters.Builder()
.addConstraint(NetworkConstraint.KEY)
.setLifespan(TimeUnit.DAYS.toMillis(1))
@@ -47,8 +45,7 @@ class CopyAttachmentToArchiveJob private constructor(private val attachmentId: A
)
override fun serialize(): ByteArray = CopyAttachmentToArchiveJobData(
attachmentId = attachmentId.id,
forBackfill = forBackfill
attachmentId = attachmentId.id
).encode()
override fun getFactoryKey(): String = KEY
@@ -139,32 +136,14 @@ class CopyAttachmentToArchiveJob private constructor(private val attachmentId: A
ArchiveThumbnailUploadJob.enqueueIfNecessary(attachmentId)
SignalStore.backup.usedBackupMediaSpace += AttachmentCipherStreamUtil.getCiphertextLength(PaddingInputStream.getPaddedSize(attachment.size))
incrementBackfillProgressIfNecessary()
ArchiveUploadProgress.onAttachmentFinished()
}
return result
}
override fun onFailure() {
incrementBackfillProgressIfNecessary()
}
private fun incrementBackfillProgressIfNecessary() {
if (!forBackfill) {
return
}
if (SignalStore.backup.totalAttachmentUploadCount > 0) {
SignalStore.backup.currentAttachmentUploadCount++
if (SignalStore.backup.currentAttachmentUploadCount >= SignalStore.backup.totalAttachmentUploadCount) {
EventBus.getDefault().postSticky(BackupV2Event(BackupV2Event.Type.FINISHED, count = 0, estimatedTotalCount = 0))
SignalStore.backup.currentAttachmentUploadCount = 0
SignalStore.backup.totalAttachmentUploadCount = 0
} else {
EventBus.getDefault().postSticky(BackupV2Event(BackupV2Event.Type.PROGRESS_ATTACHMENTS, count = SignalStore.backup.currentAttachmentUploadCount, estimatedTotalCount = SignalStore.backup.totalAttachmentUploadCount))
}
}
ArchiveUploadProgress.onAttachmentFinished()
}
class Factory : Job.Factory<CopyAttachmentToArchiveJob> {
@@ -172,7 +151,6 @@ class CopyAttachmentToArchiveJob private constructor(private val attachmentId: A
val jobData = CopyAttachmentToArchiveJobData.ADAPTER.decode(serializedData!!)
return CopyAttachmentToArchiveJob(
attachmentId = AttachmentId(jobData.attachmentId),
forBackfill = jobData.forBackfill,
parameters = parameters
)
}

View File

@@ -33,7 +33,6 @@ import kotlin.time.Duration.Companion.days
class UploadAttachmentToArchiveJob private constructor(
private val attachmentId: AttachmentId,
private var uploadSpec: ResumableUpload?,
private val forBackfill: Boolean,
parameters: Parameters
) : Job(parameters) {
@@ -44,10 +43,9 @@ class UploadAttachmentToArchiveJob private constructor(
fun buildQueueKey(attachmentId: AttachmentId) = "ArchiveAttachmentJobs_${attachmentId.id}"
}
constructor(attachmentId: AttachmentId, forBackfill: Boolean = false) : this(
constructor(attachmentId: AttachmentId) : this(
attachmentId = attachmentId,
uploadSpec = null,
forBackfill = forBackfill,
parameters = Parameters.Builder()
.addConstraint(NetworkConstraint.KEY)
.setLifespan(30.days.inWholeMilliseconds)
@@ -57,8 +55,7 @@ class UploadAttachmentToArchiveJob private constructor(
)
override fun serialize(): ByteArray = UploadAttachmentToArchiveJobData(
attachmentId = attachmentId.id,
forBackfill = forBackfill
attachmentId = attachmentId.id
).encode()
override fun getFactoryKey(): String = KEY
@@ -97,7 +94,7 @@ class UploadAttachmentToArchiveJob private constructor(
if (attachment.archiveTransferState == AttachmentTable.ArchiveTransferState.COPY_PENDING) {
Log.i(TAG, "[$attachmentId] Already marked as pending transfer. Enqueueing a copy job just in case.")
AppDependencies.jobManager.add(CopyAttachmentToArchiveJob(attachment.attachmentId, forBackfill = forBackfill))
AppDependencies.jobManager.add(CopyAttachmentToArchiveJob(attachment.attachmentId))
return Result.success()
}
@@ -154,7 +151,7 @@ class UploadAttachmentToArchiveJob private constructor(
SignalDatabase.attachments.finalizeAttachmentAfterUpload(attachment.attachmentId, uploadResult)
AppDependencies.jobManager.add(CopyAttachmentToArchiveJob(attachment.attachmentId, forBackfill = forBackfill))
AppDependencies.jobManager.add(CopyAttachmentToArchiveJob(attachment.attachmentId))
return Result.success()
}
@@ -207,7 +204,6 @@ class UploadAttachmentToArchiveJob private constructor(
return UploadAttachmentToArchiveJob(
attachmentId = AttachmentId(data.attachmentId),
uploadSpec = data.uploadSpec,
forBackfill = data.forBackfill,
parameters = parameters
)
}