Add dynamic scaling to our JobRunner system.

This commit is contained in:
Greyson Parrelli
2025-08-21 11:56:14 -04:00
committed by Jeffrey Starke
parent c117082f23
commit b3d2e31bae
10 changed files with 810 additions and 68 deletions

View File

@@ -4,6 +4,7 @@ import android.app.Application;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import androidx.annotation.VisibleForTesting;
import androidx.annotation.WorkerThread;
import com.annimon.stream.Collectors;
@@ -27,6 +28,9 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
/**
@@ -37,6 +41,8 @@ class JobController {
private static final String TAG = Log.tag(JobController.class);
private static final Predicate<MinimalJobSpec> NO_PREDICATE = spec -> true;
private final Application application;
private final JobStorage jobStorage;
private final JobInstantiator jobInstantiator;
@@ -47,6 +53,18 @@ class JobController {
private final Callback callback;
private final Map<String, Job> runningJobs;
private final int minGeneralRunners;
private final int maxGeneralRunners;
private final long generalRunnerIdleTimeout;
private final AtomicInteger nextRunnerId;
private final List<Predicate<MinimalJobSpec>> reservedRunnerPredicates;
@VisibleForTesting
final AtomicBoolean runnersStarted = new AtomicBoolean(false);
@VisibleForTesting
final List<JobRunner> activeGeneralRunners;
JobController(@NonNull Application application,
@NonNull JobStorage jobStorage,
@NonNull JobInstantiator jobInstantiator,
@@ -54,17 +72,27 @@ class JobController {
@NonNull JobTracker jobTracker,
@NonNull Scheduler scheduler,
@NonNull Debouncer debouncer,
@NonNull Callback callback)
@NonNull Callback callback,
int minGeneralRunners,
int maxGeneralRunners,
long generalRunnerIdleTimeout,
@NonNull List<Predicate<MinimalJobSpec>> reservedRunnerPredicates)
{
this.application = application;
this.jobStorage = jobStorage;
this.jobInstantiator = jobInstantiator;
this.constraintInstantiator = constraintInstantiator;
this.jobTracker = jobTracker;
this.scheduler = scheduler;
this.debouncer = debouncer;
this.callback = callback;
this.runningJobs = new HashMap<>();
this.application = application;
this.jobStorage = jobStorage;
this.jobInstantiator = jobInstantiator;
this.constraintInstantiator = constraintInstantiator;
this.jobTracker = jobTracker;
this.scheduler = scheduler;
this.debouncer = debouncer;
this.callback = callback;
this.runningJobs = new HashMap<>();
this.minGeneralRunners = minGeneralRunners;
this.maxGeneralRunners = maxGeneralRunners;
this.generalRunnerIdleTimeout = generalRunnerIdleTimeout;
this.nextRunnerId = new AtomicInteger(0);
this.activeGeneralRunners = new CopyOnWriteArrayList<>();
this.reservedRunnerPredicates = new ArrayList<>(reservedRunnerPredicates);
}
@WorkerThread
@@ -116,6 +144,7 @@ class JobController {
synchronized (this) {
notifyAll();
maybeScaleUpRunners(jobStorage.getEligibleJobCount(System.currentTimeMillis()));
}
}
@@ -167,6 +196,7 @@ class JobController {
synchronized (this) {
notifyAll();
maybeScaleUpRunners(jobStorage.getEligibleJobCount(System.currentTimeMillis()));
}
}
@@ -202,6 +232,7 @@ class JobController {
synchronized (this) {
notifyAll();
maybeScaleUpRunners(jobStorage.getEligibleJobCount(System.currentTimeMillis()));
}
}
@@ -337,20 +368,30 @@ class JobController {
* - Has no dependencies
* - Has no unmet constraints
*
* This method will block until a job is available.
* When the job returned from this method has been run, you must call {@link #onJobFinished(Job)}.
* @param predicate Filter for jobs to consider
* @param timeoutMs Maximum time to wait for a job. If 0, waits indefinitely.
* @return Job to execute, or null if the timeout is hit
*/
@WorkerThread
synchronized @NonNull Job pullNextEligibleJobForExecution(@NonNull Predicate<MinimalJobSpec> predicate) {
synchronized @Nullable Job pullNextEligibleJobForExecution(@NonNull Predicate<MinimalJobSpec> predicate, long timeoutMs) {
try {
Job job;
long startTime = System.currentTimeMillis();
while ((job = getNextEligibleJobForExecution(predicate)) == null) {
if (runningJobs.isEmpty()) {
debouncer.publish(callback::onEmpty);
}
wait();
if (timeoutMs > 0) {
long remainingTime = timeoutMs - (System.currentTimeMillis() - startTime);
if (remainingTime <= 0) {
return null;
}
wait(remainingTime);
} else {
wait();
}
}
jobStorage.markJobAsRunning(job.getId(), System.currentTimeMillis());
@@ -411,6 +452,68 @@ class JobController {
return jobStorage.areQueuesEmpty(queueKeys);
}
/**
* Initializes the dynamic JobRunner system with minimum threads.
*/
@WorkerThread
synchronized void startJobRunners() {
Log.i(TAG, "Starting JobRunners. (Reserved: " + reservedRunnerPredicates.size() + ", MinGeneral: " + minGeneralRunners + ", MaxGeneral: " + maxGeneralRunners + ", GeneralIdleTimeout: " + generalRunnerIdleTimeout + " ms)");
runnersStarted.set(true);
for (Predicate<MinimalJobSpec> predicate : reservedRunnerPredicates) {
int id = nextRunnerId.incrementAndGet();
JobRunner runner = new JobRunner(application, id, this, predicate == null ? NO_PREDICATE : predicate, 0);
runner.start();
Log.i(TAG, "Spawned new reserved JobRunner[" + id + "]");
}
for (int i = 0; i < minGeneralRunners; i++) {
spawnGeneralRunner(0);
}
maybeScaleUpRunners(jobStorage.getEligibleJobCount(System.currentTimeMillis()));
notifyAll();
}
/**
* Scales up the number of {@link JobRunner}s to satisfy the number of eligible jobs, if needed.
*/
@VisibleForTesting
synchronized void maybeScaleUpRunners(int eligibleJobCount) {
if (!runnersStarted.get()) {
return;
}
int activeRunners = this.activeGeneralRunners.size();
int maxPossibleRunnersToSpawn = maxGeneralRunners - activeRunners;
int runnersToCoverEligibleJobs = eligibleJobCount - activeRunners;
int actualRunnersToSpawn = Math.min(runnersToCoverEligibleJobs, maxPossibleRunnersToSpawn);
if (actualRunnersToSpawn > 0) {
Log.i(TAG, "Spawning " + actualRunnersToSpawn + " new JobRunner(s) to meet demand. (CurrentActive: " + activeRunners + ", EligibleJobs: " + eligibleJobCount + ", MaxAllowed: " + maxGeneralRunners + ")");
for (int i = 0; i < actualRunnersToSpawn; i++) {
spawnGeneralRunner(generalRunnerIdleTimeout);
}
}
}
private synchronized void spawnGeneralRunner(long timeOutMs) {
int id = nextRunnerId.incrementAndGet();
JobRunner runner = new JobRunner(application, id, this, NO_PREDICATE, timeOutMs);
runner.start();
activeGeneralRunners.add(runner);
Log.d(TAG, "Spawned new " + (timeOutMs == 0 ? "core" : "temporary") + " general JobRunner[" + id + "] (CurrentActive: " + activeGeneralRunners.size() + ")");
}
@VisibleForTesting
synchronized void onRunnerTerminated(@NonNull JobRunner runner) {
activeGeneralRunners.remove(runner);
Log.i(TAG, "JobRunner[" + runner.getId() + "] terminated. (CurrentActive: " + activeGeneralRunners.size() + ")");
}
@WorkerThread
private boolean chainExceedsMaximumInstances(@NonNull List<List<Job>> chain) {
if (chain.size() == 1 && chain.get(0).size() == 1) {

View File

@@ -1,7 +1,5 @@
package org.thoughtcrime.securesms.jobmanager
import android.text.TextUtils
/**
* Provides utilities to create consistent logging for jobs.
*/
@@ -15,7 +13,7 @@ object JobLogger {
@JvmStatic
fun format(job: Job, extraTag: String, event: String): String {
val id = job.id
val tag = if (TextUtils.isEmpty(extraTag)) "" else "[$extraTag]"
val tag = if (extraTag.isBlank()) "" else "[$extraTag]"
val timeSinceSubmission = System.currentTimeMillis() - job.parameters.createTime
val runAttempt = job.runAttempt + 1
val maxAttempts = if (job.parameters.maxAttempts == Job.Parameters.UNLIMITED) "Unlimited" else job.parameters.maxAttempts.toString()

View File

@@ -47,8 +47,6 @@ public class JobManager implements ConstraintObserver.Notifier {
public static final int CURRENT_VERSION = 12;
private static final Predicate<MinimalJobSpec> NO_PREDICATE = spec -> true;
private final Application application;
private final Configuration configuration;
private final Executor executor;
@@ -76,7 +74,11 @@ public class JobManager implements ConstraintObserver.Notifier {
Build.VERSION.SDK_INT < 26 ? new AlarmManagerScheduler(application)
: new CompositeScheduler(new InAppScheduler(this), new JobSchedulerScheduler(application)),
new Debouncer(500),
this::onEmptyQueue);
this::onEmptyQueue,
configuration.getMinGeneralRunners(),
configuration.getMaxGeneralRunners(),
configuration.getGeneralRunnerIdleTimeout(),
configuration.getReservedJobRunners());
executor.execute(() -> {
synchronized (this) {
@@ -111,17 +113,8 @@ public class JobManager implements ConstraintObserver.Notifier {
* Begins the execution of jobs.
*/
public void beginJobLoop() {
runOnExecutor(()-> {
int id = 0;
for (int i = 0; i < configuration.getJobThreadCount(); i++) {
new JobRunner(application, ++id, jobController, NO_PREDICATE).start();
}
for (Predicate<MinimalJobSpec> predicate : configuration.getReservedJobRunners()) {
new JobRunner(application, ++id, jobController, predicate).start();
}
runOnExecutor(() -> {
jobController.startJobRunners();
jobController.wakeUp();
});
}
@@ -596,7 +589,9 @@ public class JobManager implements ConstraintObserver.Notifier {
public static class Configuration {
private final ExecutorFactory executorFactory;
private final int jobThreadCount;
private final int minGeneralRunners;
private final int maxGeneralRunners;
private final long generalRunnerIdleTimeout;
private final JobInstantiator jobInstantiator;
private final ConstraintInstantiator constraintInstantiator;
private final List<ConstraintObserver> constraintObservers;
@@ -605,7 +600,9 @@ public class JobManager implements ConstraintObserver.Notifier {
private final JobTracker jobTracker;
private final List<Predicate<MinimalJobSpec>> reservedJobRunners;
private Configuration(int jobThreadCount,
private Configuration(int minGeneralRunners,
int maxGeneralRunners,
long generalRunnerIdleTimeout,
@NonNull ExecutorFactory executorFactory,
@NonNull JobInstantiator jobInstantiator,
@NonNull ConstraintInstantiator constraintInstantiator,
@@ -615,19 +612,29 @@ public class JobManager implements ConstraintObserver.Notifier {
@NonNull JobTracker jobTracker,
@NonNull List<Predicate<MinimalJobSpec>> reservedJobRunners)
{
this.executorFactory = executorFactory;
this.jobThreadCount = jobThreadCount;
this.jobInstantiator = jobInstantiator;
this.constraintInstantiator = constraintInstantiator;
this.constraintObservers = new ArrayList<>(constraintObservers);
this.jobStorage = jobStorage;
this.jobMigrator = jobMigrator;
this.jobTracker = jobTracker;
this.reservedJobRunners = new ArrayList<>(reservedJobRunners);
this.executorFactory = executorFactory;
this.minGeneralRunners = minGeneralRunners;
this.maxGeneralRunners = maxGeneralRunners;
this.generalRunnerIdleTimeout = generalRunnerIdleTimeout;
this.jobInstantiator = jobInstantiator;
this.constraintInstantiator = constraintInstantiator;
this.constraintObservers = new ArrayList<>(constraintObservers);
this.jobStorage = jobStorage;
this.jobMigrator = jobMigrator;
this.jobTracker = jobTracker;
this.reservedJobRunners = new ArrayList<>(reservedJobRunners);
}
int getJobThreadCount() {
return jobThreadCount;
int getMinGeneralRunners() {
return minGeneralRunners;
}
int getMaxGeneralRunners() {
return maxGeneralRunners;
}
long getGeneralRunnerIdleTimeout() {
return generalRunnerIdleTimeout;
}
@NonNull ExecutorFactory getExecutorFactory() {
@@ -665,18 +672,30 @@ public class JobManager implements ConstraintObserver.Notifier {
public static class Builder {
private ExecutorFactory executorFactory = new DefaultExecutorFactory();
private int jobThreadCount = 8;
private Map<String, Job.Factory> jobFactories = new HashMap<>();
private Map<String, Constraint.Factory> constraintFactories = new HashMap<>();
private List<ConstraintObserver> constraintObservers = new ArrayList<>();
private JobStorage jobStorage = null;
private JobMigrator jobMigrator = null;
private JobTracker jobTracker = new JobTracker();
private List<Predicate<MinimalJobSpec>> reservedJobRunners = new ArrayList<>();
private ExecutorFactory executorFactory = new DefaultExecutorFactory();
private int minGeneralRunners = 4;
private int maxGeneralRunners = 16;
private long generalRunnerIdleTimeout = TimeUnit.MINUTES.toMillis(1);
private Map<String, Job.Factory> jobFactories = new HashMap<>();
private Map<String, Constraint.Factory> constraintFactories = new HashMap<>();
private List<ConstraintObserver> constraintObservers = new ArrayList<>();
private JobStorage jobStorage = null;
private JobMigrator jobMigrator = null;
private JobTracker jobTracker = new JobTracker();
private List<Predicate<MinimalJobSpec>> reservedJobRunners = new ArrayList<>();
public @NonNull Builder setJobThreadCount(int jobThreadCount) {
this.jobThreadCount = jobThreadCount;
public @NonNull Builder setMinGeneralRunners(int minGeneralRunners) {
this.minGeneralRunners = minGeneralRunners;
return this;
}
public @NonNull Builder setMaxGeneralRunners(int maxGeneralRunners) {
this.maxGeneralRunners = maxGeneralRunners;
return this;
}
public @NonNull Builder setGeneralRunnerIdleTimeout(long generalRunnerIdleTimeout) {
this.generalRunnerIdleTimeout = generalRunnerIdleTimeout;
return this;
}
@@ -716,7 +735,9 @@ public class JobManager implements ConstraintObserver.Notifier {
}
public @NonNull Configuration build() {
return new Configuration(jobThreadCount,
return new Configuration(minGeneralRunners,
maxGeneralRunners,
generalRunnerIdleTimeout,
executorFactory,
new JobInstantiator(jobFactories),
new ConstraintInstantiator(constraintFactories),

View File

@@ -5,8 +5,6 @@ import android.os.PowerManager;
import androidx.annotation.NonNull;
import com.annimon.stream.Stream;
import org.signal.core.util.logging.Log;
import org.thoughtcrime.securesms.jobs.MinimalJobSpec;
import org.thoughtcrime.securesms.util.WakeLockUtil;
@@ -33,21 +31,36 @@ class JobRunner extends Thread {
private final int id;
private final JobController jobController;
private final Predicate<MinimalJobSpec> jobPredicate;
private final long idleTimeoutMs;
JobRunner(@NonNull Application application, int id, @NonNull JobController jobController, @NonNull Predicate<MinimalJobSpec> predicate) {
super("signal-JobRunner-" + id);
/**
* @param idleTimeoutMs If the runner experiences no activity within this duration, it will terminate. If set to 0, it will never terminate.
*/
JobRunner(@NonNull Application application, int id, @NonNull JobController jobController, @NonNull Predicate<MinimalJobSpec> predicate, long idleTimeoutMs) {
super("JobRunner-" + (idleTimeoutMs == 0 ? "core-" : "temp-") + id);
this.application = application;
this.id = id;
this.jobController = jobController;
this.jobPredicate = predicate;
this.idleTimeoutMs = idleTimeoutMs;
}
@Override
public synchronized void run() {
//noinspection InfiniteLoopStatement
Log.i(TAG, "JobRunner " + id + " started" + (idleTimeoutMs > 0 ? " with idle timeout " + idleTimeoutMs + "ms" : " with no idle timeout"));
while (true) {
Job job = jobController.pullNextEligibleJobForExecution(jobPredicate);
Job job = jobController.pullNextEligibleJobForExecution(jobPredicate, idleTimeoutMs);
if (job == null && idleTimeoutMs > 0) {
Log.i(TAG, "JobRunner " + id + " terminating due to inactivity");
jobController.onRunnerTerminated(this);
break;
} else if (job == null) {
Log.i(TAG, "JobRunner " + id + " unexpectedly given a null job. Going around the loop.");
continue;
}
Job.Result result = run(job);
jobController.onJobFinished(job);
@@ -60,7 +73,7 @@ class JobRunner extends Thread {
} else if (result.isFailure()) {
List<Job> dependents = jobController.onFailure(job);
job.onFailure();
Stream.of(dependents).forEach(Job::onFailure);
dependents.stream().forEach(Job::onFailure);
if (result.getException() != null) {
throw result.getException();

View File

@@ -20,6 +20,9 @@ interface JobStorage {
@WorkerThread
fun getNextEligibleJob(currentTime: Long, filter: (MinimalJobSpec) -> Boolean): JobSpec?
@WorkerThread
fun getEligibleJobCount(currentTime: Long): Int
@WorkerThread
fun getJobsInQueue(queue: String): List<JobSpec>

View File

@@ -148,6 +148,27 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
}
}
@Synchronized
override fun getEligibleJobCount(currentTime: Long): Int {
val migrationJob: MinimalJobSpec? = migrationJobs.firstOrNull()
return if (migrationJob != null && !migrationJob.isRunning && migrationJob.hasEligibleRunTime(currentTime)) {
1
} else if (migrationJob != null) {
0
} else {
eligibleJobs
.asSequence()
.filter { job ->
// Filter out all jobs with unmet dependencies
dependenciesByJobId[job.id].isNullOrEmpty()
}
.filterNot { it.isRunning }
.filter { job -> job.hasEligibleRunTime(currentTime) }
.count()
}
}
@Synchronized
override fun getJobsInQueue(queue: String): List<JobSpec> {
return minimalJobs

View File

@@ -76,7 +76,11 @@ class RestoreAttachmentJob private constructor(
"RestoreAttachmentJob::InitialRestore_01",
"RestoreAttachmentJob::InitialRestore_02",
"RestoreAttachmentJob::InitialRestore_03",
"RestoreAttachmentJob::InitialRestore_04"
"RestoreAttachmentJob::InitialRestore_04",
"RestoreAttachmentJob::InitialRestore_05",
"RestoreAttachmentJob::InitialRestore_06",
"RestoreAttachmentJob::InitialRestore_07",
"RestoreAttachmentJob::InitialRestore_08"
)
/** Job queues used when restoring an offloaded attachment. The number of queues in this set determine the level of parallelization. */

View File

@@ -56,10 +56,18 @@ class UploadAttachmentToArchiveJob private constructor(
/** A set of possible queues this job may use. The number of queues determines the parallelism. */
val QUEUES = setOf(
"ArchiveAttachmentJobs_1",
"ArchiveAttachmentJobs_2",
"ArchiveAttachmentJobs_3",
"ArchiveAttachmentJobs_4"
"ArchiveAttachmentJobs_01",
"ArchiveAttachmentJobs_02",
"ArchiveAttachmentJobs_03",
"ArchiveAttachmentJobs_04",
"ArchiveAttachmentJobs_05",
"ArchiveAttachmentJobs_06",
"ArchiveAttachmentJobs_07",
"ArchiveAttachmentJobs_08",
"ArchiveAttachmentJobs_09",
"ArchiveAttachmentJobs_10",
"ArchiveAttachmentJobs_11",
"ArchiveAttachmentJobs_12"
)
}

View File

@@ -0,0 +1,563 @@
package org.thoughtcrime.securesms.jobmanager
import android.app.Application
import assertk.assertThat
import assertk.assertions.hasSize
import assertk.assertions.isEqualTo
import assertk.assertions.isNull
import io.mockk.MockKAnnotations
import io.mockk.every
import io.mockk.impl.annotations.MockK
import io.mockk.mockk
import io.mockk.verify
import org.junit.Before
import org.junit.Ignore
import org.junit.Test
import org.thoughtcrime.securesms.jobmanager.persistence.DependencySpec
import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec
import org.thoughtcrime.securesms.jobmanager.persistence.JobStorage
import org.thoughtcrime.securesms.util.Debouncer
import kotlin.time.Duration.Companion.seconds
@Ignore("When running tests in bulk, this causes the JVM to OOM, I think because we're creating lots of threads that don't get cleaned up, and I haven't figured out a nice way to fix it yet.")
class JobControllerTest {
@MockK
private lateinit var application: Application
@MockK
private lateinit var jobStorage: JobStorage
@MockK
private lateinit var jobInstantiator: JobInstantiator
@MockK
private lateinit var constraintInstantiator: ConstraintInstantiator
@MockK
private lateinit var jobTracker: JobTracker
@MockK
private lateinit var scheduler: Scheduler
@MockK
private lateinit var debouncer: Debouncer
@MockK
private lateinit var callback: JobController.Callback
private lateinit var jobController: JobController
companion object {
private const val MIN_RUNNERS = 2
private const val MAX_RUNNERS = 5
}
@Before
fun setup() {
MockKAnnotations.init(this, relaxed = true)
// Mock default behavior
every { jobStorage.updateAllJobsToBePending() } returns Unit
every { debouncer.publish(any()) } returns Unit
jobController = JobController(
application,
jobStorage,
jobInstantiator,
constraintInstantiator,
jobTracker,
scheduler,
debouncer,
callback,
MIN_RUNNERS,
MAX_RUNNERS,
1.seconds.inWholeMilliseconds,
emptyList()
)
}
@Test
fun `init updates all jobs to pending`() {
// When
jobController.init()
// Then
verify { jobStorage.updateAllJobsToBePending() }
}
@Test
fun `submitNewJobChain inserts jobs and schedules them`() {
// Given
val testJob = createTestJob("test-job-1", "TestFactory")
val chain = listOf(listOf(testJob))
every { jobStorage.insertJobs(any()) } returns Unit
every { scheduler.schedule(any(), any<List<Constraint>>()) } returns Unit
// When
jobController.submitNewJobChain(chain)
// Then
verify { jobStorage.insertJobs(any()) }
verify { scheduler.schedule(0L, emptyList()) }
verify { testJob.onSubmit() }
}
@Test
fun `submitNewJobChain handles chain that exceeds maximum instances`() {
// Given
val testJob = createTestJob("test-job-1", "TestFactory") { params ->
every { params.maxInstancesForFactory } returns 1
}
every { jobStorage.getJobCountForFactory("TestFactory") } returns 1
val chain = listOf(listOf(testJob))
// When
jobController.submitNewJobChain(chain)
// Then
verify { jobTracker.onStateChange(testJob, JobTracker.JobState.IGNORED) }
verify(exactly = 0) { jobStorage.insertJobs(any()) }
}
@Test
fun `submitJobWithExistingDependencies handles failed dependencies`() {
// Given
val testJob = createTestJob("test-job-1", "TestFactory")
val dependsOn = setOf("failed-job-id")
every { jobTracker.haveAnyFailed(dependsOn) } returns true
every { jobStorage.getJobSpec("failed-job-id") } returns null
// When
jobController.submitJobWithExistingDependencies(testJob, dependsOn, null)
// Then
verify { testJob.onFailure() }
verify(exactly = 0) { jobStorage.insertJobs(any()) }
}
@Test
fun `cancelJob handles unknown job`() {
// Given
every { jobStorage.getJobSpec("unknown-job") } returns null
// When
jobController.cancelJob("unknown-job")
// Then - Should not crash
verify(exactly = 0) { jobStorage.deleteJob(any()) }
}
@Test
fun `pullNextEligibleJobForExecution with timeout returns null when no jobs available`() {
// Given
every { jobStorage.getNextEligibleJob(any(), any()) } returns null
// When
val result = jobController.pullNextEligibleJobForExecution({ true }, 100)
// Then
assertThat(result).isNull()
}
@Test
fun `pullNextEligibleJobForExecution marks job as running and tracks it`() {
// Given
val jobSpec = createJobSpec("test-job-1", "TestFactory")
val testJob = createTestJob("test-job-1", "TestFactory")
every { jobStorage.getNextEligibleJob(any(), any()) } returns jobSpec
every { jobStorage.getConstraintSpecs("test-job-1") } returns emptyList()
every { jobInstantiator.instantiate("TestFactory", any(), any()) } returns testJob
every { jobStorage.markJobAsRunning("test-job-1", any()) } returns Unit
// When
val result = jobController.pullNextEligibleJobForExecution({ true }, 0)
// Then
assertThat(result).isEqualTo(testJob)
verify { jobStorage.markJobAsRunning("test-job-1", any()) }
verify { jobTracker.onStateChange(testJob, JobTracker.JobState.RUNNING) }
}
@Test
fun `onSuccess deletes job and updates tracker`() {
// Given
val testJob = createTestJob("test-job-1", "TestFactory")
every { jobStorage.getDependencySpecsThatDependOnJob("test-job-1") } returns emptyList()
every { jobStorage.deleteJob("test-job-1") } returns Unit
// When
jobController.onSuccess(testJob, null)
// Then
verify { jobStorage.deleteJob("test-job-1") }
verify { jobTracker.onStateChange(testJob, JobTracker.JobState.SUCCESS) }
}
@Test
fun `onSuccess with output data updates dependent jobs`() {
// Given
val testJob = createTestJob("test-job-1", "TestFactory")
val outputData = "test-output".toByteArray()
val dependentSpec = DependencySpec("dependent-job", "test-job-1", false)
val dependentJobSpec = createJobSpec("dependent-job", "DependentFactory")
every { jobStorage.getDependencySpecsThatDependOnJob("test-job-1") } returns listOf(dependentSpec)
every { jobStorage.getJobSpec("dependent-job") } returns dependentJobSpec
every { jobStorage.updateJobs(any()) } returns Unit
every { jobStorage.deleteJob("test-job-1") } returns Unit
// When
jobController.onSuccess(testJob, outputData)
// Then
verify { jobStorage.updateJobs(any()) }
verify { jobStorage.deleteJob("test-job-1") }
}
@Test
fun `onFailure deletes job and all dependents`() {
// Given
val testJob = createTestJob("test-job-1", "TestFactory")
val dependentSpec = DependencySpec("dependent-job", "test-job-1", false)
val dependentJobSpec = createJobSpec("dependent-job", "DependentFactory")
val dependentJob = createTestJob("dependent-job", "DependentFactory")
every { jobStorage.getDependencySpecsThatDependOnJob("test-job-1") } returns listOf(dependentSpec)
every { jobStorage.getJobSpec("dependent-job") } returns dependentJobSpec
every { jobStorage.getConstraintSpecs("dependent-job") } returns emptyList()
every { jobInstantiator.instantiate("DependentFactory", any(), any()) } returns dependentJob
every { jobStorage.deleteJobs(any()) } returns Unit
// When
val dependents = jobController.onFailure(testJob)
// Then
assertThat(dependents).hasSize(1)
assertThat(dependents[0]).isEqualTo(dependentJob)
verify { jobStorage.deleteJobs(listOf("test-job-1", "dependent-job")) }
verify { jobTracker.onStateChange(testJob, JobTracker.JobState.FAILURE) }
verify { jobTracker.onStateChange(dependentJob, JobTracker.JobState.FAILURE) }
}
@Test
fun `onRetry updates job with backoff and schedules retry`() {
// Given
val testJob = createTestJob("test-job-1", "TestFactory")
val backoffInterval = 5000L
every { jobStorage.updateJobAfterRetry(any(), any(), any(), any(), any()) } returns Unit
every { jobStorage.getConstraintSpecs("test-job-1") } returns emptyList()
every { scheduler.schedule(backoffInterval, emptyList()) } returns Unit
// When
jobController.onRetry(testJob, backoffInterval)
// Then
verify { jobStorage.updateJobAfterRetry("test-job-1", any(), 1, backoffInterval, any()) }
verify { jobTracker.onStateChange(testJob, JobTracker.JobState.PENDING) }
verify { scheduler.schedule(backoffInterval, emptyList()) }
}
@Test
fun `submitJobs filters out jobs that exceed maximum instances`() {
// Given
val validJob = createTestJob("valid-job", "ValidFactory") { params ->
every { params.maxInstancesForFactory } returns 5
every { params.maxInstancesForQueue } returns Job.Parameters.UNLIMITED
}
val invalidJob = createTestJob("invalid-job", "InvalidFactory") { params ->
every { params.maxInstancesForFactory } returns 1
every { params.maxInstancesForQueue } returns Job.Parameters.UNLIMITED
}
every { jobStorage.getJobCountForFactory("ValidFactory") } returns 2
every { jobStorage.getJobCountForFactory("InvalidFactory") } returns 1
every { jobStorage.insertJobs(any()) } returns Unit
every { scheduler.schedule(any(), any<List<Constraint>>()) } returns Unit
// When
jobController.submitJobs(listOf(validJob, invalidJob))
// Then
verify { jobTracker.onStateChange(invalidJob, JobTracker.JobState.IGNORED) }
verify { jobStorage.insertJobs(any()) }
verify { validJob.onSubmit() }
verify(exactly = 0) { invalidJob.onSubmit() }
}
@Test
fun `submitJobs handles empty list after filtering`() {
// Given
val invalidJob = createTestJob("invalid-job", "InvalidFactory") { params ->
every { params.maxInstancesForFactory } returns 1
}
every { jobStorage.getJobCountForFactory("InvalidFactory") } returns 1
// When
jobController.submitJobs(listOf(invalidJob))
// Then
verify(exactly = 0) { jobStorage.insertJobs(any()) }
verify(exactly = 0) { scheduler.schedule(any(), any<List<Constraint>>()) }
}
@Test
fun `pullNextEligibleJobForExecution publishes empty callback when no running jobs`() {
// Given
every { jobStorage.getNextEligibleJob(any(), any()) } returns null
// When
jobController.pullNextEligibleJobForExecution({ true }, 10)
// Then
verify { debouncer.publish(any()) }
}
@Test
fun `findJobs returns filtered results from storage`() {
// Given
val predicate: (JobSpec) -> Boolean = { it.factoryKey == "TestFactory" }
val jobSpecs = listOf(
createJobSpec("job-1", "TestFactory"),
createJobSpec("job-2", "OtherFactory")
)
every { jobStorage.getAllMatchingFilter(any()) } returns jobSpecs.filter(predicate)
// When
val result = jobController.findJobs(predicate)
// Then
assertThat(result).hasSize(1)
assertThat(result[0].id).isEqualTo("job-1")
verify { jobStorage.getAllMatchingFilter(any()) }
}
@Test
fun `cancelJob handles job not found gracefully`() {
// Given
val nonExistentJobId = "non-existent-job"
every { jobStorage.getJobSpec(nonExistentJobId) } returns null
// When
jobController.cancelJob(nonExistentJobId)
// Then
verify(exactly = 0) { jobStorage.deleteJob(any()) }
verify(exactly = 0) { jobStorage.deleteJobs(any()) }
}
@Test
fun `cancelAllInQueue cancels all jobs in specified queue`() {
// Given
val queueName = "test-queue"
val job1Spec = createJobSpec("job-1", "Factory1")
val job2Spec = createJobSpec("job-2", "Factory2")
val job1 = createTestJob("job-1", "Factory1")
val job2 = createTestJob("job-2", "Factory2")
every { jobStorage.getJobsInQueue(queueName) } returns listOf(job1Spec, job2Spec)
every { jobStorage.getJobSpec("job-1") } returns job1Spec
every { jobStorage.getJobSpec("job-2") } returns job2Spec
every { jobStorage.getConstraintSpecs(any()) } returns emptyList()
every { jobInstantiator.instantiate("Factory1", any(), any()) } returns job1
every { jobInstantiator.instantiate("Factory2", any(), any()) } returns job2
every { jobStorage.getDependencySpecsThatDependOnJob(any()) } returns emptyList()
every { jobStorage.deleteJob(any()) } returns Unit
every { jobStorage.deleteJobs(any()) } returns Unit
// When
jobController.cancelAllInQueue(queueName)
// Then
verify { job1.cancel() }
verify { job2.cancel() }
verify { job1.onFailure() }
verify { job2.onFailure() }
}
@Test
fun `onFailure handles cascading failures correctly`() {
// Given - Create a chain: job1 -> job2 -> job3
val job1 = createTestJob("job-1", "Factory1")
val job2Spec = createJobSpec("job-2", "Factory2")
val job3Spec = createJobSpec("job-3", "Factory3")
val job2 = createTestJob("job-2", "Factory2")
val job3 = createTestJob("job-3", "Factory3")
val job2Dependency = DependencySpec("job-2", "job-1", false)
val job3Dependency = DependencySpec("job-3", "job-2", false)
every { jobStorage.getDependencySpecsThatDependOnJob("job-1") } returns listOf(job2Dependency)
every { jobStorage.getDependencySpecsThatDependOnJob("job-2") } returns listOf(job3Dependency)
every { jobStorage.getDependencySpecsThatDependOnJob("job-3") } returns emptyList()
every { jobStorage.getJobSpec("job-2") } returns job2Spec
every { jobStorage.getJobSpec("job-3") } returns job3Spec
every { jobStorage.getConstraintSpecs(any()) } returns emptyList()
every { jobInstantiator.instantiate("Factory2", any(), any()) } returns job2
every { jobInstantiator.instantiate("Factory3", any(), any()) } returns job3
every { jobStorage.deleteJobs(any()) } returns Unit
// When
val dependents = jobController.onFailure(job1)
// Then
assertThat(dependents).hasSize(1)
assertThat(dependents[0]).isEqualTo(job2)
verify { jobStorage.deleteJobs(listOf("job-1", "job-2")) }
verify { jobTracker.onStateChange(job1, JobTracker.JobState.FAILURE) }
verify { jobTracker.onStateChange(job2, JobTracker.JobState.FAILURE) }
}
@Test
fun `onFailure handles null job specs in dependency chain`() {
// Given
val job1 = createTestJob("job-1", "Factory1")
val job2Dependency = DependencySpec("job-2", "job-1", false)
val job3Dependency = DependencySpec("job-3", "job-1", false)
every { jobStorage.getDependencySpecsThatDependOnJob("job-1") } returns listOf(job2Dependency, job3Dependency)
every { jobStorage.getJobSpec("job-2") } returns null // Job was already deleted
every { jobStorage.getJobSpec("job-3") } returns createJobSpec("job-3", "Factory3")
every { jobStorage.getConstraintSpecs("job-3") } returns emptyList()
every { jobInstantiator.instantiate("Factory3", any(), any()) } returns createTestJob("job-3", "Factory3")
every { jobStorage.deleteJobs(any()) } returns Unit
// When
val dependents = jobController.onFailure(job1)
// Then - Should handle null job spec gracefully
assertThat(dependents).hasSize(1)
assertThat(dependents[0].id).isEqualTo("job-3")
}
@Test
fun `startJobRunners - creates minimum number of runners, even with no eligible jobs`() {
// Given
every { jobStorage.getEligibleJobCount(any()) } returns 0
// When
jobController.startJobRunners()
// Then
assertThat(jobController.activeGeneralRunners.size).isEqualTo(MIN_RUNNERS)
}
@Test
fun `startJobRunners - creates runners to satisfy demand`() {
// Given
every { jobStorage.getEligibleJobCount(any()) } returns MAX_RUNNERS
// When
jobController.startJobRunners()
// Then
assertThat(jobController.activeGeneralRunners.size).isEqualTo(MAX_RUNNERS)
}
@Test
fun `startJobRunners - does not exceed max runners`() {
// Given
every { jobStorage.getEligibleJobCount(any()) } returns MAX_RUNNERS * 2
// When
jobController.startJobRunners()
// Then
assertThat(jobController.activeGeneralRunners.size).isEqualTo(MAX_RUNNERS)
}
@Test
fun `maybeScaleUpRunners - creates runners to satisfy demand`() {
// When
jobController.runnersStarted.set(true)
jobController.maybeScaleUpRunners(MAX_RUNNERS)
// Then
assertThat(jobController.activeGeneralRunners.size).isEqualTo(MAX_RUNNERS)
}
@Test
fun `maybeScaleUpRunners - does not exceed max runners`() {
// When
jobController.runnersStarted.set(true)
jobController.maybeScaleUpRunners(MAX_RUNNERS * 2)
// Then
assertThat(jobController.activeGeneralRunners.size).isEqualTo(MAX_RUNNERS)
}
@Test
fun `onRunnerTerminated - decrements active runners`() {
// Given
every { jobStorage.getEligibleJobCount(any()) } returns MAX_RUNNERS
jobController.startJobRunners()
// When
jobController.onRunnerTerminated(jobController.activeGeneralRunners.first())
// Then
assertThat(jobController.activeGeneralRunners.size).isEqualTo(MAX_RUNNERS - 1)
}
/**
* @param parameterConfig Allows you to mock out specific fields on the [Job.Parameters].
*/
private fun createTestJob(
id: String,
factoryKey: String,
parameterConfig: ((Job.Parameters) -> Unit)? = null
): Job {
val job = mockk<Job>(relaxed = true)
every { job.id } returns id
every { job.factoryKey } returns factoryKey
every { job.runAttempt } returns 0
every { job.serialize() } returns null
every { job.parameters } returns createJobParameters(id, parameterConfig)
return job
}
private fun createJobParameters(
id: String,
config: ((Job.Parameters) -> Unit)? = null
): Job.Parameters {
val params = mockk<Job.Parameters>(relaxed = true)
every { params.id } returns id
every { params.maxInstancesForFactory } returns Job.Parameters.UNLIMITED
every { params.maxInstancesForQueue } returns Job.Parameters.UNLIMITED
every { params.queue } returns null
every { params.constraintKeys } returns emptyList()
every { params.initialDelay } returns 0L
config?.invoke(params)
return params
}
private fun createJobSpec(id: String, factoryKey: String): JobSpec {
return JobSpec(
id = id,
factoryKey = factoryKey,
queueKey = null,
createTime = System.currentTimeMillis(),
lastRunAttemptTime = 0L,
nextBackoffInterval = 0L,
runAttempt = 0,
maxAttempts = 3,
lifespan = -1L,
serializedData = null,
serializedInputData = null,
isRunning = false,
isMemoryOnly = false,
globalPriority = 0,
queuePriority = 0,
initialDelay = 0L
)
}
}

View File

@@ -829,6 +829,14 @@ class FastJobStorageTest {
assertThat(subject.getNextEligibleJob(20, NO_PREDICATE)).isEqualTo(fullSpec1.jobSpec)
}
@Test
fun `getEligibleJobCount - general`() {
val subject = FastJobStorage(mockDatabase(DataSet1.FULL_SPECS))
subject.init()
assertThat(subject.getEligibleJobCount(0)).isEqualTo(1)
}
@Test
fun `deleteJobs - writes to database`() {
val database = mockDatabase(DataSet1.FULL_SPECS)