From b3d2e31bae7deee52aeb520b78091d1ca7383459 Mon Sep 17 00:00:00 2001 From: Greyson Parrelli Date: Thu, 21 Aug 2025 11:56:14 -0400 Subject: [PATCH] Add dynamic scaling to our JobRunner system. --- .../securesms/jobmanager/JobController.java | 131 +++- .../securesms/jobmanager/JobLogger.kt | 4 +- .../securesms/jobmanager/JobManager.java | 99 +-- .../securesms/jobmanager/JobRunner.java | 27 +- .../jobmanager/persistence/JobStorage.kt | 3 + .../securesms/jobs/FastJobStorage.kt | 21 + .../securesms/jobs/RestoreAttachmentJob.kt | 6 +- .../jobs/UploadAttachmentToArchiveJob.kt | 16 +- .../securesms/jobmanager/JobControllerTest.kt | 563 ++++++++++++++++++ .../securesms/jobs/FastJobStorageTest.kt | 8 + 10 files changed, 810 insertions(+), 68 deletions(-) create mode 100644 app/src/test/java/org/thoughtcrime/securesms/jobmanager/JobControllerTest.kt diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java index 012c85b2bb..588875f37d 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java @@ -4,6 +4,7 @@ import android.app.Application; import androidx.annotation.NonNull; import androidx.annotation.Nullable; +import androidx.annotation.VisibleForTesting; import androidx.annotation.WorkerThread; import com.annimon.stream.Collectors; @@ -27,6 +28,9 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; /** @@ -37,6 +41,8 @@ class JobController { private static final String TAG = Log.tag(JobController.class); + private static final Predicate NO_PREDICATE = spec -> true; + private final Application application; private final JobStorage jobStorage; private final JobInstantiator jobInstantiator; @@ -47,6 +53,18 @@ class JobController { private final Callback callback; private final Map runningJobs; + private final int minGeneralRunners; + private final int maxGeneralRunners; + private final long generalRunnerIdleTimeout; + private final AtomicInteger nextRunnerId; + private final List> reservedRunnerPredicates; + + @VisibleForTesting + final AtomicBoolean runnersStarted = new AtomicBoolean(false); + + @VisibleForTesting + final List activeGeneralRunners; + JobController(@NonNull Application application, @NonNull JobStorage jobStorage, @NonNull JobInstantiator jobInstantiator, @@ -54,17 +72,27 @@ class JobController { @NonNull JobTracker jobTracker, @NonNull Scheduler scheduler, @NonNull Debouncer debouncer, - @NonNull Callback callback) + @NonNull Callback callback, + int minGeneralRunners, + int maxGeneralRunners, + long generalRunnerIdleTimeout, + @NonNull List> reservedRunnerPredicates) { - this.application = application; - this.jobStorage = jobStorage; - this.jobInstantiator = jobInstantiator; - this.constraintInstantiator = constraintInstantiator; - this.jobTracker = jobTracker; - this.scheduler = scheduler; - this.debouncer = debouncer; - this.callback = callback; - this.runningJobs = new HashMap<>(); + this.application = application; + this.jobStorage = jobStorage; + this.jobInstantiator = jobInstantiator; + this.constraintInstantiator = constraintInstantiator; + this.jobTracker = jobTracker; + this.scheduler = scheduler; + this.debouncer = debouncer; + this.callback = callback; + this.runningJobs = new HashMap<>(); + this.minGeneralRunners = minGeneralRunners; + this.maxGeneralRunners = maxGeneralRunners; + this.generalRunnerIdleTimeout = generalRunnerIdleTimeout; + this.nextRunnerId = new AtomicInteger(0); + this.activeGeneralRunners = new CopyOnWriteArrayList<>(); + this.reservedRunnerPredicates = new ArrayList<>(reservedRunnerPredicates); } @WorkerThread @@ -116,6 +144,7 @@ class JobController { synchronized (this) { notifyAll(); + maybeScaleUpRunners(jobStorage.getEligibleJobCount(System.currentTimeMillis())); } } @@ -167,6 +196,7 @@ class JobController { synchronized (this) { notifyAll(); + maybeScaleUpRunners(jobStorage.getEligibleJobCount(System.currentTimeMillis())); } } @@ -202,6 +232,7 @@ class JobController { synchronized (this) { notifyAll(); + maybeScaleUpRunners(jobStorage.getEligibleJobCount(System.currentTimeMillis())); } } @@ -337,20 +368,30 @@ class JobController { * - Has no dependencies * - Has no unmet constraints * - * This method will block until a job is available. - * When the job returned from this method has been run, you must call {@link #onJobFinished(Job)}. + * @param predicate Filter for jobs to consider + * @param timeoutMs Maximum time to wait for a job. If 0, waits indefinitely. + * @return Job to execute, or null if the timeout is hit */ @WorkerThread - synchronized @NonNull Job pullNextEligibleJobForExecution(@NonNull Predicate predicate) { + synchronized @Nullable Job pullNextEligibleJobForExecution(@NonNull Predicate predicate, long timeoutMs) { try { Job job; + long startTime = System.currentTimeMillis(); while ((job = getNextEligibleJobForExecution(predicate)) == null) { if (runningJobs.isEmpty()) { debouncer.publish(callback::onEmpty); } - wait(); + if (timeoutMs > 0) { + long remainingTime = timeoutMs - (System.currentTimeMillis() - startTime); + if (remainingTime <= 0) { + return null; + } + wait(remainingTime); + } else { + wait(); + } } jobStorage.markJobAsRunning(job.getId(), System.currentTimeMillis()); @@ -411,6 +452,68 @@ class JobController { return jobStorage.areQueuesEmpty(queueKeys); } + /** + * Initializes the dynamic JobRunner system with minimum threads. + */ + @WorkerThread + synchronized void startJobRunners() { + Log.i(TAG, "Starting JobRunners. (Reserved: " + reservedRunnerPredicates.size() + ", MinGeneral: " + minGeneralRunners + ", MaxGeneral: " + maxGeneralRunners + ", GeneralIdleTimeout: " + generalRunnerIdleTimeout + " ms)"); + runnersStarted.set(true); + + for (Predicate predicate : reservedRunnerPredicates) { + int id = nextRunnerId.incrementAndGet(); + JobRunner runner = new JobRunner(application, id, this, predicate == null ? NO_PREDICATE : predicate, 0); + runner.start(); + Log.i(TAG, "Spawned new reserved JobRunner[" + id + "]"); + } + + for (int i = 0; i < minGeneralRunners; i++) { + spawnGeneralRunner(0); + } + + maybeScaleUpRunners(jobStorage.getEligibleJobCount(System.currentTimeMillis())); + + notifyAll(); + } + + /** + * Scales up the number of {@link JobRunner}s to satisfy the number of eligible jobs, if needed. + */ + @VisibleForTesting + synchronized void maybeScaleUpRunners(int eligibleJobCount) { + if (!runnersStarted.get()) { + return; + } + + int activeRunners = this.activeGeneralRunners.size(); + int maxPossibleRunnersToSpawn = maxGeneralRunners - activeRunners; + int runnersToCoverEligibleJobs = eligibleJobCount - activeRunners; + int actualRunnersToSpawn = Math.min(runnersToCoverEligibleJobs, maxPossibleRunnersToSpawn); + + if (actualRunnersToSpawn > 0) { + Log.i(TAG, "Spawning " + actualRunnersToSpawn + " new JobRunner(s) to meet demand. (CurrentActive: " + activeRunners + ", EligibleJobs: " + eligibleJobCount + ", MaxAllowed: " + maxGeneralRunners + ")"); + + for (int i = 0; i < actualRunnersToSpawn; i++) { + spawnGeneralRunner(generalRunnerIdleTimeout); + } + } + } + + private synchronized void spawnGeneralRunner(long timeOutMs) { + int id = nextRunnerId.incrementAndGet(); + JobRunner runner = new JobRunner(application, id, this, NO_PREDICATE, timeOutMs); + runner.start(); + activeGeneralRunners.add(runner); + + Log.d(TAG, "Spawned new " + (timeOutMs == 0 ? "core" : "temporary") + " general JobRunner[" + id + "] (CurrentActive: " + activeGeneralRunners.size() + ")"); + } + + @VisibleForTesting + synchronized void onRunnerTerminated(@NonNull JobRunner runner) { + activeGeneralRunners.remove(runner); + Log.i(TAG, "JobRunner[" + runner.getId() + "] terminated. (CurrentActive: " + activeGeneralRunners.size() + ")"); + } + @WorkerThread private boolean chainExceedsMaximumInstances(@NonNull List> chain) { if (chain.size() == 1 && chain.get(0).size() == 1) { diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobLogger.kt b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobLogger.kt index f70becf54b..88e4eb840c 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobLogger.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobLogger.kt @@ -1,7 +1,5 @@ package org.thoughtcrime.securesms.jobmanager -import android.text.TextUtils - /** * Provides utilities to create consistent logging for jobs. */ @@ -15,7 +13,7 @@ object JobLogger { @JvmStatic fun format(job: Job, extraTag: String, event: String): String { val id = job.id - val tag = if (TextUtils.isEmpty(extraTag)) "" else "[$extraTag]" + val tag = if (extraTag.isBlank()) "" else "[$extraTag]" val timeSinceSubmission = System.currentTimeMillis() - job.parameters.createTime val runAttempt = job.runAttempt + 1 val maxAttempts = if (job.parameters.maxAttempts == Job.Parameters.UNLIMITED) "Unlimited" else job.parameters.maxAttempts.toString() diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java index c562d342f6..6fa0135b16 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java @@ -47,8 +47,6 @@ public class JobManager implements ConstraintObserver.Notifier { public static final int CURRENT_VERSION = 12; - private static final Predicate NO_PREDICATE = spec -> true; - private final Application application; private final Configuration configuration; private final Executor executor; @@ -76,7 +74,11 @@ public class JobManager implements ConstraintObserver.Notifier { Build.VERSION.SDK_INT < 26 ? new AlarmManagerScheduler(application) : new CompositeScheduler(new InAppScheduler(this), new JobSchedulerScheduler(application)), new Debouncer(500), - this::onEmptyQueue); + this::onEmptyQueue, + configuration.getMinGeneralRunners(), + configuration.getMaxGeneralRunners(), + configuration.getGeneralRunnerIdleTimeout(), + configuration.getReservedJobRunners()); executor.execute(() -> { synchronized (this) { @@ -111,17 +113,8 @@ public class JobManager implements ConstraintObserver.Notifier { * Begins the execution of jobs. */ public void beginJobLoop() { - runOnExecutor(()-> { - int id = 0; - - for (int i = 0; i < configuration.getJobThreadCount(); i++) { - new JobRunner(application, ++id, jobController, NO_PREDICATE).start(); - } - - for (Predicate predicate : configuration.getReservedJobRunners()) { - new JobRunner(application, ++id, jobController, predicate).start(); - } - + runOnExecutor(() -> { + jobController.startJobRunners(); jobController.wakeUp(); }); } @@ -596,7 +589,9 @@ public class JobManager implements ConstraintObserver.Notifier { public static class Configuration { private final ExecutorFactory executorFactory; - private final int jobThreadCount; + private final int minGeneralRunners; + private final int maxGeneralRunners; + private final long generalRunnerIdleTimeout; private final JobInstantiator jobInstantiator; private final ConstraintInstantiator constraintInstantiator; private final List constraintObservers; @@ -605,7 +600,9 @@ public class JobManager implements ConstraintObserver.Notifier { private final JobTracker jobTracker; private final List> reservedJobRunners; - private Configuration(int jobThreadCount, + private Configuration(int minGeneralRunners, + int maxGeneralRunners, + long generalRunnerIdleTimeout, @NonNull ExecutorFactory executorFactory, @NonNull JobInstantiator jobInstantiator, @NonNull ConstraintInstantiator constraintInstantiator, @@ -615,19 +612,29 @@ public class JobManager implements ConstraintObserver.Notifier { @NonNull JobTracker jobTracker, @NonNull List> reservedJobRunners) { - this.executorFactory = executorFactory; - this.jobThreadCount = jobThreadCount; - this.jobInstantiator = jobInstantiator; - this.constraintInstantiator = constraintInstantiator; - this.constraintObservers = new ArrayList<>(constraintObservers); - this.jobStorage = jobStorage; - this.jobMigrator = jobMigrator; - this.jobTracker = jobTracker; - this.reservedJobRunners = new ArrayList<>(reservedJobRunners); + this.executorFactory = executorFactory; + this.minGeneralRunners = minGeneralRunners; + this.maxGeneralRunners = maxGeneralRunners; + this.generalRunnerIdleTimeout = generalRunnerIdleTimeout; + this.jobInstantiator = jobInstantiator; + this.constraintInstantiator = constraintInstantiator; + this.constraintObservers = new ArrayList<>(constraintObservers); + this.jobStorage = jobStorage; + this.jobMigrator = jobMigrator; + this.jobTracker = jobTracker; + this.reservedJobRunners = new ArrayList<>(reservedJobRunners); } - int getJobThreadCount() { - return jobThreadCount; + int getMinGeneralRunners() { + return minGeneralRunners; + } + + int getMaxGeneralRunners() { + return maxGeneralRunners; + } + + long getGeneralRunnerIdleTimeout() { + return generalRunnerIdleTimeout; } @NonNull ExecutorFactory getExecutorFactory() { @@ -665,18 +672,30 @@ public class JobManager implements ConstraintObserver.Notifier { public static class Builder { - private ExecutorFactory executorFactory = new DefaultExecutorFactory(); - private int jobThreadCount = 8; - private Map jobFactories = new HashMap<>(); - private Map constraintFactories = new HashMap<>(); - private List constraintObservers = new ArrayList<>(); - private JobStorage jobStorage = null; - private JobMigrator jobMigrator = null; - private JobTracker jobTracker = new JobTracker(); - private List> reservedJobRunners = new ArrayList<>(); + private ExecutorFactory executorFactory = new DefaultExecutorFactory(); + private int minGeneralRunners = 4; + private int maxGeneralRunners = 16; + private long generalRunnerIdleTimeout = TimeUnit.MINUTES.toMillis(1); + private Map jobFactories = new HashMap<>(); + private Map constraintFactories = new HashMap<>(); + private List constraintObservers = new ArrayList<>(); + private JobStorage jobStorage = null; + private JobMigrator jobMigrator = null; + private JobTracker jobTracker = new JobTracker(); + private List> reservedJobRunners = new ArrayList<>(); - public @NonNull Builder setJobThreadCount(int jobThreadCount) { - this.jobThreadCount = jobThreadCount; + public @NonNull Builder setMinGeneralRunners(int minGeneralRunners) { + this.minGeneralRunners = minGeneralRunners; + return this; + } + + public @NonNull Builder setMaxGeneralRunners(int maxGeneralRunners) { + this.maxGeneralRunners = maxGeneralRunners; + return this; + } + + public @NonNull Builder setGeneralRunnerIdleTimeout(long generalRunnerIdleTimeout) { + this.generalRunnerIdleTimeout = generalRunnerIdleTimeout; return this; } @@ -716,7 +735,9 @@ public class JobManager implements ConstraintObserver.Notifier { } public @NonNull Configuration build() { - return new Configuration(jobThreadCount, + return new Configuration(minGeneralRunners, + maxGeneralRunners, + generalRunnerIdleTimeout, executorFactory, new JobInstantiator(jobFactories), new ConstraintInstantiator(constraintFactories), diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobRunner.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobRunner.java index df023ee807..55b40544ba 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobRunner.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobRunner.java @@ -5,8 +5,6 @@ import android.os.PowerManager; import androidx.annotation.NonNull; -import com.annimon.stream.Stream; - import org.signal.core.util.logging.Log; import org.thoughtcrime.securesms.jobs.MinimalJobSpec; import org.thoughtcrime.securesms.util.WakeLockUtil; @@ -33,21 +31,36 @@ class JobRunner extends Thread { private final int id; private final JobController jobController; private final Predicate jobPredicate; + private final long idleTimeoutMs; - JobRunner(@NonNull Application application, int id, @NonNull JobController jobController, @NonNull Predicate predicate) { - super("signal-JobRunner-" + id); + /** + * @param idleTimeoutMs If the runner experiences no activity within this duration, it will terminate. If set to 0, it will never terminate. + */ + JobRunner(@NonNull Application application, int id, @NonNull JobController jobController, @NonNull Predicate predicate, long idleTimeoutMs) { + super("JobRunner-" + (idleTimeoutMs == 0 ? "core-" : "temp-") + id); this.application = application; this.id = id; this.jobController = jobController; this.jobPredicate = predicate; + this.idleTimeoutMs = idleTimeoutMs; } @Override public synchronized void run() { - //noinspection InfiniteLoopStatement + Log.i(TAG, "JobRunner " + id + " started" + (idleTimeoutMs > 0 ? " with idle timeout " + idleTimeoutMs + "ms" : " with no idle timeout")); + while (true) { - Job job = jobController.pullNextEligibleJobForExecution(jobPredicate); + Job job = jobController.pullNextEligibleJobForExecution(jobPredicate, idleTimeoutMs); + if (job == null && idleTimeoutMs > 0) { + Log.i(TAG, "JobRunner " + id + " terminating due to inactivity"); + jobController.onRunnerTerminated(this); + break; + } else if (job == null) { + Log.i(TAG, "JobRunner " + id + " unexpectedly given a null job. Going around the loop."); + continue; + } + Job.Result result = run(job); jobController.onJobFinished(job); @@ -60,7 +73,7 @@ class JobRunner extends Thread { } else if (result.isFailure()) { List dependents = jobController.onFailure(job); job.onFailure(); - Stream.of(dependents).forEach(Job::onFailure); + dependents.stream().forEach(Job::onFailure); if (result.getException() != null) { throw result.getException(); diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/persistence/JobStorage.kt b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/persistence/JobStorage.kt index 64497fae7c..f28143a8ad 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/persistence/JobStorage.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/persistence/JobStorage.kt @@ -20,6 +20,9 @@ interface JobStorage { @WorkerThread fun getNextEligibleJob(currentTime: Long, filter: (MinimalJobSpec) -> Boolean): JobSpec? + @WorkerThread + fun getEligibleJobCount(currentTime: Long): Int + @WorkerThread fun getJobsInQueue(queue: String): List diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/FastJobStorage.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/FastJobStorage.kt index 7d59f4c260..46fdd58ddc 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/FastJobStorage.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/FastJobStorage.kt @@ -148,6 +148,27 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage { } } + @Synchronized + override fun getEligibleJobCount(currentTime: Long): Int { + val migrationJob: MinimalJobSpec? = migrationJobs.firstOrNull() + + return if (migrationJob != null && !migrationJob.isRunning && migrationJob.hasEligibleRunTime(currentTime)) { + 1 + } else if (migrationJob != null) { + 0 + } else { + eligibleJobs + .asSequence() + .filter { job -> + // Filter out all jobs with unmet dependencies + dependenciesByJobId[job.id].isNullOrEmpty() + } + .filterNot { it.isRunning } + .filter { job -> job.hasEligibleRunTime(currentTime) } + .count() + } + } + @Synchronized override fun getJobsInQueue(queue: String): List { return minimalJobs diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/RestoreAttachmentJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/RestoreAttachmentJob.kt index 8048b60450..192e472e8d 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/RestoreAttachmentJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/RestoreAttachmentJob.kt @@ -76,7 +76,11 @@ class RestoreAttachmentJob private constructor( "RestoreAttachmentJob::InitialRestore_01", "RestoreAttachmentJob::InitialRestore_02", "RestoreAttachmentJob::InitialRestore_03", - "RestoreAttachmentJob::InitialRestore_04" + "RestoreAttachmentJob::InitialRestore_04", + "RestoreAttachmentJob::InitialRestore_05", + "RestoreAttachmentJob::InitialRestore_06", + "RestoreAttachmentJob::InitialRestore_07", + "RestoreAttachmentJob::InitialRestore_08" ) /** Job queues used when restoring an offloaded attachment. The number of queues in this set determine the level of parallelization. */ diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt index ca33298ace..56e82be4a7 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt @@ -56,10 +56,18 @@ class UploadAttachmentToArchiveJob private constructor( /** A set of possible queues this job may use. The number of queues determines the parallelism. */ val QUEUES = setOf( - "ArchiveAttachmentJobs_1", - "ArchiveAttachmentJobs_2", - "ArchiveAttachmentJobs_3", - "ArchiveAttachmentJobs_4" + "ArchiveAttachmentJobs_01", + "ArchiveAttachmentJobs_02", + "ArchiveAttachmentJobs_03", + "ArchiveAttachmentJobs_04", + "ArchiveAttachmentJobs_05", + "ArchiveAttachmentJobs_06", + "ArchiveAttachmentJobs_07", + "ArchiveAttachmentJobs_08", + "ArchiveAttachmentJobs_09", + "ArchiveAttachmentJobs_10", + "ArchiveAttachmentJobs_11", + "ArchiveAttachmentJobs_12" ) } diff --git a/app/src/test/java/org/thoughtcrime/securesms/jobmanager/JobControllerTest.kt b/app/src/test/java/org/thoughtcrime/securesms/jobmanager/JobControllerTest.kt new file mode 100644 index 0000000000..652b6da89f --- /dev/null +++ b/app/src/test/java/org/thoughtcrime/securesms/jobmanager/JobControllerTest.kt @@ -0,0 +1,563 @@ +package org.thoughtcrime.securesms.jobmanager + +import android.app.Application +import assertk.assertThat +import assertk.assertions.hasSize +import assertk.assertions.isEqualTo +import assertk.assertions.isNull +import io.mockk.MockKAnnotations +import io.mockk.every +import io.mockk.impl.annotations.MockK +import io.mockk.mockk +import io.mockk.verify +import org.junit.Before +import org.junit.Ignore +import org.junit.Test +import org.thoughtcrime.securesms.jobmanager.persistence.DependencySpec +import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec +import org.thoughtcrime.securesms.jobmanager.persistence.JobStorage +import org.thoughtcrime.securesms.util.Debouncer +import kotlin.time.Duration.Companion.seconds + +@Ignore("When running tests in bulk, this causes the JVM to OOM, I think because we're creating lots of threads that don't get cleaned up, and I haven't figured out a nice way to fix it yet.") +class JobControllerTest { + + @MockK + private lateinit var application: Application + + @MockK + private lateinit var jobStorage: JobStorage + + @MockK + private lateinit var jobInstantiator: JobInstantiator + + @MockK + private lateinit var constraintInstantiator: ConstraintInstantiator + + @MockK + private lateinit var jobTracker: JobTracker + + @MockK + private lateinit var scheduler: Scheduler + + @MockK + private lateinit var debouncer: Debouncer + + @MockK + private lateinit var callback: JobController.Callback + + private lateinit var jobController: JobController + + companion object { + private const val MIN_RUNNERS = 2 + private const val MAX_RUNNERS = 5 + } + + @Before + fun setup() { + MockKAnnotations.init(this, relaxed = true) + + // Mock default behavior + every { jobStorage.updateAllJobsToBePending() } returns Unit + every { debouncer.publish(any()) } returns Unit + + jobController = JobController( + application, + jobStorage, + jobInstantiator, + constraintInstantiator, + jobTracker, + scheduler, + debouncer, + callback, + MIN_RUNNERS, + MAX_RUNNERS, + 1.seconds.inWholeMilliseconds, + emptyList() + ) + } + + @Test + fun `init updates all jobs to pending`() { + // When + jobController.init() + + // Then + verify { jobStorage.updateAllJobsToBePending() } + } + + @Test + fun `submitNewJobChain inserts jobs and schedules them`() { + // Given + val testJob = createTestJob("test-job-1", "TestFactory") + val chain = listOf(listOf(testJob)) + + every { jobStorage.insertJobs(any()) } returns Unit + every { scheduler.schedule(any(), any>()) } returns Unit + + // When + jobController.submitNewJobChain(chain) + + // Then + verify { jobStorage.insertJobs(any()) } + verify { scheduler.schedule(0L, emptyList()) } + verify { testJob.onSubmit() } + } + + @Test + fun `submitNewJobChain handles chain that exceeds maximum instances`() { + // Given + val testJob = createTestJob("test-job-1", "TestFactory") { params -> + every { params.maxInstancesForFactory } returns 1 + } + every { jobStorage.getJobCountForFactory("TestFactory") } returns 1 + + val chain = listOf(listOf(testJob)) + + // When + jobController.submitNewJobChain(chain) + + // Then + verify { jobTracker.onStateChange(testJob, JobTracker.JobState.IGNORED) } + verify(exactly = 0) { jobStorage.insertJobs(any()) } + } + + @Test + fun `submitJobWithExistingDependencies handles failed dependencies`() { + // Given + val testJob = createTestJob("test-job-1", "TestFactory") + val dependsOn = setOf("failed-job-id") + + every { jobTracker.haveAnyFailed(dependsOn) } returns true + every { jobStorage.getJobSpec("failed-job-id") } returns null + + // When + jobController.submitJobWithExistingDependencies(testJob, dependsOn, null) + + // Then + verify { testJob.onFailure() } + verify(exactly = 0) { jobStorage.insertJobs(any()) } + } + + @Test + fun `cancelJob handles unknown job`() { + // Given + every { jobStorage.getJobSpec("unknown-job") } returns null + + // When + jobController.cancelJob("unknown-job") + + // Then - Should not crash + verify(exactly = 0) { jobStorage.deleteJob(any()) } + } + + @Test + fun `pullNextEligibleJobForExecution with timeout returns null when no jobs available`() { + // Given + every { jobStorage.getNextEligibleJob(any(), any()) } returns null + + // When + val result = jobController.pullNextEligibleJobForExecution({ true }, 100) + + // Then + assertThat(result).isNull() + } + + @Test + fun `pullNextEligibleJobForExecution marks job as running and tracks it`() { + // Given + val jobSpec = createJobSpec("test-job-1", "TestFactory") + val testJob = createTestJob("test-job-1", "TestFactory") + + every { jobStorage.getNextEligibleJob(any(), any()) } returns jobSpec + every { jobStorage.getConstraintSpecs("test-job-1") } returns emptyList() + every { jobInstantiator.instantiate("TestFactory", any(), any()) } returns testJob + every { jobStorage.markJobAsRunning("test-job-1", any()) } returns Unit + + // When + val result = jobController.pullNextEligibleJobForExecution({ true }, 0) + + // Then + assertThat(result).isEqualTo(testJob) + verify { jobStorage.markJobAsRunning("test-job-1", any()) } + verify { jobTracker.onStateChange(testJob, JobTracker.JobState.RUNNING) } + } + + @Test + fun `onSuccess deletes job and updates tracker`() { + // Given + val testJob = createTestJob("test-job-1", "TestFactory") + every { jobStorage.getDependencySpecsThatDependOnJob("test-job-1") } returns emptyList() + every { jobStorage.deleteJob("test-job-1") } returns Unit + + // When + jobController.onSuccess(testJob, null) + + // Then + verify { jobStorage.deleteJob("test-job-1") } + verify { jobTracker.onStateChange(testJob, JobTracker.JobState.SUCCESS) } + } + + @Test + fun `onSuccess with output data updates dependent jobs`() { + // Given + val testJob = createTestJob("test-job-1", "TestFactory") + val outputData = "test-output".toByteArray() + val dependentSpec = DependencySpec("dependent-job", "test-job-1", false) + val dependentJobSpec = createJobSpec("dependent-job", "DependentFactory") + + every { jobStorage.getDependencySpecsThatDependOnJob("test-job-1") } returns listOf(dependentSpec) + every { jobStorage.getJobSpec("dependent-job") } returns dependentJobSpec + every { jobStorage.updateJobs(any()) } returns Unit + every { jobStorage.deleteJob("test-job-1") } returns Unit + + // When + jobController.onSuccess(testJob, outputData) + + // Then + verify { jobStorage.updateJobs(any()) } + verify { jobStorage.deleteJob("test-job-1") } + } + + @Test + fun `onFailure deletes job and all dependents`() { + // Given + val testJob = createTestJob("test-job-1", "TestFactory") + val dependentSpec = DependencySpec("dependent-job", "test-job-1", false) + val dependentJobSpec = createJobSpec("dependent-job", "DependentFactory") + val dependentJob = createTestJob("dependent-job", "DependentFactory") + + every { jobStorage.getDependencySpecsThatDependOnJob("test-job-1") } returns listOf(dependentSpec) + every { jobStorage.getJobSpec("dependent-job") } returns dependentJobSpec + every { jobStorage.getConstraintSpecs("dependent-job") } returns emptyList() + every { jobInstantiator.instantiate("DependentFactory", any(), any()) } returns dependentJob + every { jobStorage.deleteJobs(any()) } returns Unit + + // When + val dependents = jobController.onFailure(testJob) + + // Then + assertThat(dependents).hasSize(1) + assertThat(dependents[0]).isEqualTo(dependentJob) + verify { jobStorage.deleteJobs(listOf("test-job-1", "dependent-job")) } + verify { jobTracker.onStateChange(testJob, JobTracker.JobState.FAILURE) } + verify { jobTracker.onStateChange(dependentJob, JobTracker.JobState.FAILURE) } + } + + @Test + fun `onRetry updates job with backoff and schedules retry`() { + // Given + val testJob = createTestJob("test-job-1", "TestFactory") + val backoffInterval = 5000L + + every { jobStorage.updateJobAfterRetry(any(), any(), any(), any(), any()) } returns Unit + every { jobStorage.getConstraintSpecs("test-job-1") } returns emptyList() + every { scheduler.schedule(backoffInterval, emptyList()) } returns Unit + + // When + jobController.onRetry(testJob, backoffInterval) + + // Then + verify { jobStorage.updateJobAfterRetry("test-job-1", any(), 1, backoffInterval, any()) } + verify { jobTracker.onStateChange(testJob, JobTracker.JobState.PENDING) } + verify { scheduler.schedule(backoffInterval, emptyList()) } + } + + @Test + fun `submitJobs filters out jobs that exceed maximum instances`() { + // Given + val validJob = createTestJob("valid-job", "ValidFactory") { params -> + every { params.maxInstancesForFactory } returns 5 + every { params.maxInstancesForQueue } returns Job.Parameters.UNLIMITED + } + val invalidJob = createTestJob("invalid-job", "InvalidFactory") { params -> + every { params.maxInstancesForFactory } returns 1 + every { params.maxInstancesForQueue } returns Job.Parameters.UNLIMITED + } + + every { jobStorage.getJobCountForFactory("ValidFactory") } returns 2 + every { jobStorage.getJobCountForFactory("InvalidFactory") } returns 1 + + every { jobStorage.insertJobs(any()) } returns Unit + every { scheduler.schedule(any(), any>()) } returns Unit + + // When + jobController.submitJobs(listOf(validJob, invalidJob)) + + // Then + verify { jobTracker.onStateChange(invalidJob, JobTracker.JobState.IGNORED) } + verify { jobStorage.insertJobs(any()) } + verify { validJob.onSubmit() } + verify(exactly = 0) { invalidJob.onSubmit() } + } + + @Test + fun `submitJobs handles empty list after filtering`() { + // Given + val invalidJob = createTestJob("invalid-job", "InvalidFactory") { params -> + every { params.maxInstancesForFactory } returns 1 + } + every { jobStorage.getJobCountForFactory("InvalidFactory") } returns 1 + + // When + jobController.submitJobs(listOf(invalidJob)) + + // Then + verify(exactly = 0) { jobStorage.insertJobs(any()) } + verify(exactly = 0) { scheduler.schedule(any(), any>()) } + } + + @Test + fun `pullNextEligibleJobForExecution publishes empty callback when no running jobs`() { + // Given + every { jobStorage.getNextEligibleJob(any(), any()) } returns null + + // When + jobController.pullNextEligibleJobForExecution({ true }, 10) + + // Then + verify { debouncer.publish(any()) } + } + + @Test + fun `findJobs returns filtered results from storage`() { + // Given + val predicate: (JobSpec) -> Boolean = { it.factoryKey == "TestFactory" } + val jobSpecs = listOf( + createJobSpec("job-1", "TestFactory"), + createJobSpec("job-2", "OtherFactory") + ) + + every { jobStorage.getAllMatchingFilter(any()) } returns jobSpecs.filter(predicate) + + // When + val result = jobController.findJobs(predicate) + + // Then + assertThat(result).hasSize(1) + assertThat(result[0].id).isEqualTo("job-1") + verify { jobStorage.getAllMatchingFilter(any()) } + } + + @Test + fun `cancelJob handles job not found gracefully`() { + // Given + val nonExistentJobId = "non-existent-job" + every { jobStorage.getJobSpec(nonExistentJobId) } returns null + + // When + jobController.cancelJob(nonExistentJobId) + + // Then + verify(exactly = 0) { jobStorage.deleteJob(any()) } + verify(exactly = 0) { jobStorage.deleteJobs(any()) } + } + + @Test + fun `cancelAllInQueue cancels all jobs in specified queue`() { + // Given + val queueName = "test-queue" + val job1Spec = createJobSpec("job-1", "Factory1") + val job2Spec = createJobSpec("job-2", "Factory2") + val job1 = createTestJob("job-1", "Factory1") + val job2 = createTestJob("job-2", "Factory2") + + every { jobStorage.getJobsInQueue(queueName) } returns listOf(job1Spec, job2Spec) + every { jobStorage.getJobSpec("job-1") } returns job1Spec + every { jobStorage.getJobSpec("job-2") } returns job2Spec + every { jobStorage.getConstraintSpecs(any()) } returns emptyList() + every { jobInstantiator.instantiate("Factory1", any(), any()) } returns job1 + every { jobInstantiator.instantiate("Factory2", any(), any()) } returns job2 + every { jobStorage.getDependencySpecsThatDependOnJob(any()) } returns emptyList() + every { jobStorage.deleteJob(any()) } returns Unit + every { jobStorage.deleteJobs(any()) } returns Unit + + // When + jobController.cancelAllInQueue(queueName) + + // Then + verify { job1.cancel() } + verify { job2.cancel() } + verify { job1.onFailure() } + verify { job2.onFailure() } + } + + @Test + fun `onFailure handles cascading failures correctly`() { + // Given - Create a chain: job1 -> job2 -> job3 + val job1 = createTestJob("job-1", "Factory1") + val job2Spec = createJobSpec("job-2", "Factory2") + val job3Spec = createJobSpec("job-3", "Factory3") + val job2 = createTestJob("job-2", "Factory2") + val job3 = createTestJob("job-3", "Factory3") + + val job2Dependency = DependencySpec("job-2", "job-1", false) + val job3Dependency = DependencySpec("job-3", "job-2", false) + + every { jobStorage.getDependencySpecsThatDependOnJob("job-1") } returns listOf(job2Dependency) + every { jobStorage.getDependencySpecsThatDependOnJob("job-2") } returns listOf(job3Dependency) + every { jobStorage.getDependencySpecsThatDependOnJob("job-3") } returns emptyList() + + every { jobStorage.getJobSpec("job-2") } returns job2Spec + every { jobStorage.getJobSpec("job-3") } returns job3Spec + every { jobStorage.getConstraintSpecs(any()) } returns emptyList() + every { jobInstantiator.instantiate("Factory2", any(), any()) } returns job2 + every { jobInstantiator.instantiate("Factory3", any(), any()) } returns job3 + + every { jobStorage.deleteJobs(any()) } returns Unit + + // When + val dependents = jobController.onFailure(job1) + + // Then + assertThat(dependents).hasSize(1) + assertThat(dependents[0]).isEqualTo(job2) + verify { jobStorage.deleteJobs(listOf("job-1", "job-2")) } + verify { jobTracker.onStateChange(job1, JobTracker.JobState.FAILURE) } + verify { jobTracker.onStateChange(job2, JobTracker.JobState.FAILURE) } + } + + @Test + fun `onFailure handles null job specs in dependency chain`() { + // Given + val job1 = createTestJob("job-1", "Factory1") + val job2Dependency = DependencySpec("job-2", "job-1", false) + val job3Dependency = DependencySpec("job-3", "job-1", false) + + every { jobStorage.getDependencySpecsThatDependOnJob("job-1") } returns listOf(job2Dependency, job3Dependency) + every { jobStorage.getJobSpec("job-2") } returns null // Job was already deleted + every { jobStorage.getJobSpec("job-3") } returns createJobSpec("job-3", "Factory3") + every { jobStorage.getConstraintSpecs("job-3") } returns emptyList() + every { jobInstantiator.instantiate("Factory3", any(), any()) } returns createTestJob("job-3", "Factory3") + every { jobStorage.deleteJobs(any()) } returns Unit + + // When + val dependents = jobController.onFailure(job1) + + // Then - Should handle null job spec gracefully + assertThat(dependents).hasSize(1) + assertThat(dependents[0].id).isEqualTo("job-3") + } + + @Test + fun `startJobRunners - creates minimum number of runners, even with no eligible jobs`() { + // Given + every { jobStorage.getEligibleJobCount(any()) } returns 0 + + // When + jobController.startJobRunners() + + // Then + assertThat(jobController.activeGeneralRunners.size).isEqualTo(MIN_RUNNERS) + } + + @Test + fun `startJobRunners - creates runners to satisfy demand`() { + // Given + every { jobStorage.getEligibleJobCount(any()) } returns MAX_RUNNERS + + // When + jobController.startJobRunners() + + // Then + assertThat(jobController.activeGeneralRunners.size).isEqualTo(MAX_RUNNERS) + } + + @Test + fun `startJobRunners - does not exceed max runners`() { + // Given + every { jobStorage.getEligibleJobCount(any()) } returns MAX_RUNNERS * 2 + + // When + jobController.startJobRunners() + + // Then + assertThat(jobController.activeGeneralRunners.size).isEqualTo(MAX_RUNNERS) + } + + @Test + fun `maybeScaleUpRunners - creates runners to satisfy demand`() { + // When + jobController.runnersStarted.set(true) + jobController.maybeScaleUpRunners(MAX_RUNNERS) + + // Then + assertThat(jobController.activeGeneralRunners.size).isEqualTo(MAX_RUNNERS) + } + + @Test + fun `maybeScaleUpRunners - does not exceed max runners`() { + // When + jobController.runnersStarted.set(true) + jobController.maybeScaleUpRunners(MAX_RUNNERS * 2) + + // Then + assertThat(jobController.activeGeneralRunners.size).isEqualTo(MAX_RUNNERS) + } + + @Test + fun `onRunnerTerminated - decrements active runners`() { + // Given + every { jobStorage.getEligibleJobCount(any()) } returns MAX_RUNNERS + jobController.startJobRunners() + + // When + jobController.onRunnerTerminated(jobController.activeGeneralRunners.first()) + + // Then + assertThat(jobController.activeGeneralRunners.size).isEqualTo(MAX_RUNNERS - 1) + } + + /** + * @param parameterConfig Allows you to mock out specific fields on the [Job.Parameters]. + */ + private fun createTestJob( + id: String, + factoryKey: String, + parameterConfig: ((Job.Parameters) -> Unit)? = null + ): Job { + val job = mockk(relaxed = true) + every { job.id } returns id + every { job.factoryKey } returns factoryKey + every { job.runAttempt } returns 0 + every { job.serialize() } returns null + every { job.parameters } returns createJobParameters(id, parameterConfig) + return job + } + + private fun createJobParameters( + id: String, + config: ((Job.Parameters) -> Unit)? = null + ): Job.Parameters { + val params = mockk(relaxed = true) + every { params.id } returns id + every { params.maxInstancesForFactory } returns Job.Parameters.UNLIMITED + every { params.maxInstancesForQueue } returns Job.Parameters.UNLIMITED + every { params.queue } returns null + every { params.constraintKeys } returns emptyList() + every { params.initialDelay } returns 0L + config?.invoke(params) + return params + } + + private fun createJobSpec(id: String, factoryKey: String): JobSpec { + return JobSpec( + id = id, + factoryKey = factoryKey, + queueKey = null, + createTime = System.currentTimeMillis(), + lastRunAttemptTime = 0L, + nextBackoffInterval = 0L, + runAttempt = 0, + maxAttempts = 3, + lifespan = -1L, + serializedData = null, + serializedInputData = null, + isRunning = false, + isMemoryOnly = false, + globalPriority = 0, + queuePriority = 0, + initialDelay = 0L + ) + } +} diff --git a/app/src/test/java/org/thoughtcrime/securesms/jobs/FastJobStorageTest.kt b/app/src/test/java/org/thoughtcrime/securesms/jobs/FastJobStorageTest.kt index 98c4fa77a1..40a20c609b 100644 --- a/app/src/test/java/org/thoughtcrime/securesms/jobs/FastJobStorageTest.kt +++ b/app/src/test/java/org/thoughtcrime/securesms/jobs/FastJobStorageTest.kt @@ -829,6 +829,14 @@ class FastJobStorageTest { assertThat(subject.getNextEligibleJob(20, NO_PREDICATE)).isEqualTo(fullSpec1.jobSpec) } + @Test + fun `getEligibleJobCount - general`() { + val subject = FastJobStorage(mockDatabase(DataSet1.FULL_SPECS)) + subject.init() + + assertThat(subject.getEligibleJobCount(0)).isEqualTo(1) + } + @Test fun `deleteJobs - writes to database`() { val database = mockDatabase(DataSet1.FULL_SPECS)