From 7b88b0717060ea5c5c367f36c08934b9efaf21c6 Mon Sep 17 00:00:00 2001 From: Greyson Parrelli Date: Thu, 7 Aug 2025 16:27:08 -0400 Subject: [PATCH] Increase parallelization of attachment archive uploads. --- .../securesms/backup/ArchiveUploadProgress.kt | 6 ++--- .../securesms/jobmanager/JobManager.java | 11 ++++++++ .../jobs/ArchiveThumbnailUploadJob.kt | 25 ++++++++++++++++++- .../jobs/CopyAttachmentToArchiveJob.kt | 2 +- .../jobs/UploadAttachmentToArchiveJob.kt | 20 ++++++--------- .../securesms/keyvalue/InternalValues.kt | 1 - 6 files changed, 46 insertions(+), 19 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/backup/ArchiveUploadProgress.kt b/app/src/main/java/org/thoughtcrime/securesms/backup/ArchiveUploadProgress.kt index 17b1aacad6..aff106637b 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/backup/ArchiveUploadProgress.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/backup/ArchiveUploadProgress.kt @@ -114,9 +114,7 @@ object ArchiveUploadProgress { BackupMessagesJob.cancel() AppDependencies.jobManager.cancelAllInQueue(ArchiveCommitAttachmentDeletesJob.ARCHIVE_ATTACHMENT_QUEUE) - UploadAttachmentToArchiveJob.getAllQueueKeys().forEach { - AppDependencies.jobManager.cancelAllInQueue(it) - } + AppDependencies.jobManager.cancelAllInQueues(UploadAttachmentToArchiveJob.QUEUES) AppDependencies.jobManager.cancelAllInQueue(ArchiveThumbnailUploadJob.KEY) } @@ -129,7 +127,7 @@ object ArchiveUploadProgress { Log.d(TAG, "Flushing job manager queue...") AppDependencies.jobManager.flush() - val queues = setOf(ArchiveThumbnailUploadJob.KEY, ArchiveCommitAttachmentDeletesJob.ARCHIVE_ATTACHMENT_QUEUE) + UploadAttachmentToArchiveJob.getAllQueueKeys() + val queues = UploadAttachmentToArchiveJob.QUEUES + ArchiveThumbnailUploadJob.QUEUES + ArchiveCommitAttachmentDeletesJob.ARCHIVE_ATTACHMENT_QUEUE Log.d(TAG, "Waiting for cancelations to occur...") while (!AppDependencies.jobManager.areQueuesEmpty(queues)) { delay(1.seconds) diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java index e2d98d64b2..c562d342f6 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java @@ -275,6 +275,17 @@ public class JobManager implements ConstraintObserver.Notifier { runOnExecutor(() -> jobController.cancelAllInQueue(queue)); } + /** + * Cancels all jobs in the specified queues. See {@link #cancel(String)} for details. + */ + public void cancelAllInQueues(@NonNull Collection queues) { + runOnExecutor(() -> { + for (String queue : queues) { + jobController.cancelAllInQueue(queue); + } + }); + } + /** * Perform an arbitrary update on enqueued jobs. Will not apply to jobs that are already running. * You shouldn't use this if you can help it. You give yourself an opportunity to really screw diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveThumbnailUploadJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveThumbnailUploadJob.kt index f97f634828..13bce87d96 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveThumbnailUploadJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveThumbnailUploadJob.kt @@ -54,6 +54,12 @@ class ArchiveThumbnailUploadJob private constructor( private const val MAX_PIXEL_DIMENSION = 256 private const val ADDITIONAL_QUALITY_DECREASE = 10f + /** A set of possible queues this job may use. The number of queues determines the parallelism. */ + val QUEUES = setOf( + "ArchiveThumbnailUploadJob_1", + "ArchiveThumbnailUploadJob_2" + ) + fun enqueueIfNecessary(attachmentId: AttachmentId) { if (SignalStore.backup.backsUpMedia) { AppDependencies.jobManager.add(ArchiveThumbnailUploadJob(attachmentId)) @@ -67,10 +73,11 @@ class ArchiveThumbnailUploadJob private constructor( private constructor(attachmentId: AttachmentId) : this( Parameters.Builder() - .setQueue("ArchiveThumbnailUploadJob") + .setQueue(QUEUES.random()) .addConstraint(NetworkConstraint.KEY) .setLifespan(1.days.inWholeMilliseconds) .setMaxAttempts(Parameters.UNLIMITED) + .setGlobalPriority(Parameters.PRIORITY_LOW) .build(), attachmentId ) @@ -104,6 +111,10 @@ class ArchiveThumbnailUploadJob private constructor( return Result.success() } + if (isCanceled) { + return Result.failure() + } + val mediaRootBackupKey = SignalStore.backup.mediaRootBackupKey val specResult = BackupRepository @@ -116,6 +127,10 @@ class ArchiveThumbnailUploadJob private constructor( ) } + if (isCanceled) { + return Result.failure() + } + val resumableUpload = when (specResult) { is NetworkResult.Success -> { Log.d(TAG, "Got an upload spec!") @@ -138,6 +153,10 @@ class ArchiveThumbnailUploadJob private constructor( } } + if (isCanceled) { + return Result.failure() + } + val attachmentPointer = try { buildSignalServiceAttachmentStream(thumbnailResult, resumableUpload).use { stream -> val pointer = AppDependencies.signalServiceMessageSender.uploadAttachment(stream) @@ -148,6 +167,10 @@ class ArchiveThumbnailUploadJob private constructor( return Result.retry(defaultBackoff()) } + if (isCanceled) { + return Result.failure() + } + return when (val result = BackupRepository.copyThumbnailToArchive(attachmentPointer, attachment)) { is NetworkResult.Success -> { // save attachment thumbnail diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/CopyAttachmentToArchiveJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/CopyAttachmentToArchiveJob.kt index d5acbcf96f..5de21ab90b 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/CopyAttachmentToArchiveJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/CopyAttachmentToArchiveJob.kt @@ -48,7 +48,7 @@ class CopyAttachmentToArchiveJob private constructor(private val attachmentId: A .addConstraint(NoRemoteArchiveGarbageCollectionPendingConstraint.KEY) .setLifespan(TimeUnit.DAYS.toMillis(1)) .setMaxAttempts(Parameters.UNLIMITED) - .setQueue(UploadAttachmentToArchiveJob.buildQueueKey()) + .setQueue(UploadAttachmentToArchiveJob.QUEUES.random()) .setQueuePriority(Parameters.PRIORITY_HIGH) .build() ) diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt index 205d1e8339..1fcf5f5787 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt @@ -36,7 +36,6 @@ import org.whispersystems.signalservice.api.messages.SignalServiceAttachment import java.io.FileNotFoundException import java.io.IOException import java.net.ProtocolException -import kotlin.random.Random import kotlin.time.Duration.Companion.days import kotlin.time.Duration.Companion.milliseconds @@ -54,17 +53,14 @@ class UploadAttachmentToArchiveJob private constructor( companion object { private val TAG = Log.tag(UploadAttachmentToArchiveJob::class) const val KEY = "UploadAttachmentToArchiveJob" - private const val MAX_JOB_QUEUES = 2 - /** - * This randomly selects between one of [MAX_JOB_QUEUES] queues. It's a fun way of limiting the concurrency of the upload jobs to - * take up at most two job runners. - */ - fun buildQueueKey( - queue: Int = Random.nextInt(0, MAX_JOB_QUEUES) - ) = "ArchiveAttachmentJobs_$queue" - - fun getAllQueueKeys() = (0 until MAX_JOB_QUEUES).map { buildQueueKey(queue = it) } + /** A set of possible queues this job may use. The number of queues determines the parallelism. */ + val QUEUES = setOf( + "ArchiveAttachmentJobs_1", + "ArchiveAttachmentJobs_2", + "ArchiveAttachmentJobs_3", + "ArchiveAttachmentJobs_4" + ) } constructor(attachmentId: AttachmentId, canReuseUpload: Boolean = true) : this( @@ -75,7 +71,7 @@ class UploadAttachmentToArchiveJob private constructor( .addConstraint(NetworkConstraint.KEY) .setLifespan(30.days.inWholeMilliseconds) .setMaxAttempts(Parameters.UNLIMITED) - .setQueue(buildQueueKey()) + .setQueue(QUEUES.random()) .build() ) diff --git a/app/src/main/java/org/thoughtcrime/securesms/keyvalue/InternalValues.kt b/app/src/main/java/org/thoughtcrime/securesms/keyvalue/InternalValues.kt index 25d836956f..dd68429933 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/keyvalue/InternalValues.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/keyvalue/InternalValues.kt @@ -3,7 +3,6 @@ package org.thoughtcrime.securesms.keyvalue import org.signal.ringrtc.CallManager.DataMode import org.thoughtcrime.securesms.BuildConfig import org.thoughtcrime.securesms.backup.v2.proto.BackupDebugInfo -import org.thoughtcrime.securesms.util.Environment import org.thoughtcrime.securesms.util.Environment.Calling.defaultSfuUrl import org.thoughtcrime.securesms.util.RemoteConfig