Increase parallelization of attachment archive uploads.

This commit is contained in:
Greyson Parrelli
2025-08-07 16:27:08 -04:00
parent 3273787a05
commit 7b88b07170
6 changed files with 46 additions and 19 deletions

View File

@@ -114,9 +114,7 @@ object ArchiveUploadProgress {
BackupMessagesJob.cancel()
AppDependencies.jobManager.cancelAllInQueue(ArchiveCommitAttachmentDeletesJob.ARCHIVE_ATTACHMENT_QUEUE)
UploadAttachmentToArchiveJob.getAllQueueKeys().forEach {
AppDependencies.jobManager.cancelAllInQueue(it)
}
AppDependencies.jobManager.cancelAllInQueues(UploadAttachmentToArchiveJob.QUEUES)
AppDependencies.jobManager.cancelAllInQueue(ArchiveThumbnailUploadJob.KEY)
}
@@ -129,7 +127,7 @@ object ArchiveUploadProgress {
Log.d(TAG, "Flushing job manager queue...")
AppDependencies.jobManager.flush()
val queues = setOf(ArchiveThumbnailUploadJob.KEY, ArchiveCommitAttachmentDeletesJob.ARCHIVE_ATTACHMENT_QUEUE) + UploadAttachmentToArchiveJob.getAllQueueKeys()
val queues = UploadAttachmentToArchiveJob.QUEUES + ArchiveThumbnailUploadJob.QUEUES + ArchiveCommitAttachmentDeletesJob.ARCHIVE_ATTACHMENT_QUEUE
Log.d(TAG, "Waiting for cancelations to occur...")
while (!AppDependencies.jobManager.areQueuesEmpty(queues)) {
delay(1.seconds)

View File

@@ -275,6 +275,17 @@ public class JobManager implements ConstraintObserver.Notifier {
runOnExecutor(() -> jobController.cancelAllInQueue(queue));
}
/**
* Cancels all jobs in the specified queues. See {@link #cancel(String)} for details.
*/
public void cancelAllInQueues(@NonNull Collection<String> queues) {
runOnExecutor(() -> {
for (String queue : queues) {
jobController.cancelAllInQueue(queue);
}
});
}
/**
* Perform an arbitrary update on enqueued jobs. Will not apply to jobs that are already running.
* You shouldn't use this if you can help it. You give yourself an opportunity to really screw

View File

@@ -54,6 +54,12 @@ class ArchiveThumbnailUploadJob private constructor(
private const val MAX_PIXEL_DIMENSION = 256
private const val ADDITIONAL_QUALITY_DECREASE = 10f
/** A set of possible queues this job may use. The number of queues determines the parallelism. */
val QUEUES = setOf(
"ArchiveThumbnailUploadJob_1",
"ArchiveThumbnailUploadJob_2"
)
fun enqueueIfNecessary(attachmentId: AttachmentId) {
if (SignalStore.backup.backsUpMedia) {
AppDependencies.jobManager.add(ArchiveThumbnailUploadJob(attachmentId))
@@ -67,10 +73,11 @@ class ArchiveThumbnailUploadJob private constructor(
private constructor(attachmentId: AttachmentId) : this(
Parameters.Builder()
.setQueue("ArchiveThumbnailUploadJob")
.setQueue(QUEUES.random())
.addConstraint(NetworkConstraint.KEY)
.setLifespan(1.days.inWholeMilliseconds)
.setMaxAttempts(Parameters.UNLIMITED)
.setGlobalPriority(Parameters.PRIORITY_LOW)
.build(),
attachmentId
)
@@ -104,6 +111,10 @@ class ArchiveThumbnailUploadJob private constructor(
return Result.success()
}
if (isCanceled) {
return Result.failure()
}
val mediaRootBackupKey = SignalStore.backup.mediaRootBackupKey
val specResult = BackupRepository
@@ -116,6 +127,10 @@ class ArchiveThumbnailUploadJob private constructor(
)
}
if (isCanceled) {
return Result.failure()
}
val resumableUpload = when (specResult) {
is NetworkResult.Success -> {
Log.d(TAG, "Got an upload spec!")
@@ -138,6 +153,10 @@ class ArchiveThumbnailUploadJob private constructor(
}
}
if (isCanceled) {
return Result.failure()
}
val attachmentPointer = try {
buildSignalServiceAttachmentStream(thumbnailResult, resumableUpload).use { stream ->
val pointer = AppDependencies.signalServiceMessageSender.uploadAttachment(stream)
@@ -148,6 +167,10 @@ class ArchiveThumbnailUploadJob private constructor(
return Result.retry(defaultBackoff())
}
if (isCanceled) {
return Result.failure()
}
return when (val result = BackupRepository.copyThumbnailToArchive(attachmentPointer, attachment)) {
is NetworkResult.Success -> {
// save attachment thumbnail

View File

@@ -48,7 +48,7 @@ class CopyAttachmentToArchiveJob private constructor(private val attachmentId: A
.addConstraint(NoRemoteArchiveGarbageCollectionPendingConstraint.KEY)
.setLifespan(TimeUnit.DAYS.toMillis(1))
.setMaxAttempts(Parameters.UNLIMITED)
.setQueue(UploadAttachmentToArchiveJob.buildQueueKey())
.setQueue(UploadAttachmentToArchiveJob.QUEUES.random())
.setQueuePriority(Parameters.PRIORITY_HIGH)
.build()
)

View File

@@ -36,7 +36,6 @@ import org.whispersystems.signalservice.api.messages.SignalServiceAttachment
import java.io.FileNotFoundException
import java.io.IOException
import java.net.ProtocolException
import kotlin.random.Random
import kotlin.time.Duration.Companion.days
import kotlin.time.Duration.Companion.milliseconds
@@ -54,17 +53,14 @@ class UploadAttachmentToArchiveJob private constructor(
companion object {
private val TAG = Log.tag(UploadAttachmentToArchiveJob::class)
const val KEY = "UploadAttachmentToArchiveJob"
private const val MAX_JOB_QUEUES = 2
/**
* This randomly selects between one of [MAX_JOB_QUEUES] queues. It's a fun way of limiting the concurrency of the upload jobs to
* take up at most two job runners.
*/
fun buildQueueKey(
queue: Int = Random.nextInt(0, MAX_JOB_QUEUES)
) = "ArchiveAttachmentJobs_$queue"
fun getAllQueueKeys() = (0 until MAX_JOB_QUEUES).map { buildQueueKey(queue = it) }
/** A set of possible queues this job may use. The number of queues determines the parallelism. */
val QUEUES = setOf(
"ArchiveAttachmentJobs_1",
"ArchiveAttachmentJobs_2",
"ArchiveAttachmentJobs_3",
"ArchiveAttachmentJobs_4"
)
}
constructor(attachmentId: AttachmentId, canReuseUpload: Boolean = true) : this(
@@ -75,7 +71,7 @@ class UploadAttachmentToArchiveJob private constructor(
.addConstraint(NetworkConstraint.KEY)
.setLifespan(30.days.inWholeMilliseconds)
.setMaxAttempts(Parameters.UNLIMITED)
.setQueue(buildQueueKey())
.setQueue(QUEUES.random())
.build()
)

View File

@@ -3,7 +3,6 @@ package org.thoughtcrime.securesms.keyvalue
import org.signal.ringrtc.CallManager.DataMode
import org.thoughtcrime.securesms.BuildConfig
import org.thoughtcrime.securesms.backup.v2.proto.BackupDebugInfo
import org.thoughtcrime.securesms.util.Environment
import org.thoughtcrime.securesms.util.Environment.Calling.defaultSfuUrl
import org.thoughtcrime.securesms.util.RemoteConfig