Fix bug allowing concurrent execution of jobs in the same queue.

There was an issue where a higher priority job in the same queue would
become the new most eligible job, even if the current most eligible job
was actively running.
This commit is contained in:
Greyson Parrelli
2026-04-24 13:26:12 +00:00
parent 572c11ee6d
commit 799e57dbe9
2 changed files with 24 additions and 0 deletions
@@ -492,6 +492,10 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
private fun placeJobInEligibleList(jobCandidate: MinimalJobSpec) {
val existingJobInQueue = jobCandidate.queueKey?.let { mostEligibleJobForQueue[it] }
if (existingJobInQueue != null) {
if (existingJobInQueue.isRunning) {
return
}
if (jobCandidate.globalPriority < existingJobInQueue.globalPriority) {
return
}
@@ -775,6 +775,26 @@ class FastJobStorageTest {
assertThat(subject.getNextEligibleJob(100, NO_PREDICATE)).isEqualTo(higherPriorityJob)
}
@Test
fun `getNextEligibleJob - newly-inserted higher-priority job does not replace running job in same queue`() {
// This test is for a specific bug, where a newly-added job could replace a lower-priority job in the same
// queue even if the existing job was actively running. This is a defensive test to make sure that we don't replace
// running jobs with another job, even if it's higher priority.
val runningJob = jobSpec(id = "1", factoryKey = "f1", queueKey = "q", isRunning = true)
val subject = FastJobStorage(mockDatabase(listOf(FullSpec(runningJob, emptyList(), emptyList()))))
subject.init()
assertThat(subject.getNextEligibleJob(10, NO_PREDICATE)).isNull()
val higherPriorityJob = jobSpec(id = "2", factoryKey = "f2", queueKey = "q", globalPriority = Job.Parameters.PRIORITY_HIGH)
subject.insertJobs(listOf(FullSpec(higherPriorityJob, emptyList(), emptyList())))
assertThat(subject.getNextEligibleJob(10, NO_PREDICATE)).isNull()
subject.deleteJob("1")
assertThat(subject.getNextEligibleJob(10, NO_PREDICATE)).isEqualTo(higherPriorityJob)
}
@Test
fun `getNextEligibleJob - updating job to have a higher global priority replaces lower priority in queue`() {
val subject = FastJobStorage(mockDatabase(DataSet1.FULL_SPECS))