Small JobManager tweaks to scale more often and improve debugging info.

This commit is contained in:
Greyson Parrelli
2025-08-28 16:41:15 -04:00
parent 3bcfb5ab61
commit a5c4c3b54a
3 changed files with 16 additions and 9 deletions

View File

@@ -32,6 +32,7 @@ import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntSupplier;
import java.util.function.Predicate;
/**
@@ -104,6 +105,7 @@ class JobController {
synchronized void wakeUp() {
notifyAll();
maybeScaleUpRunners(() -> jobStorage.getEligibleJobCount(System.currentTimeMillis()));
}
@WorkerThread
@@ -145,7 +147,7 @@ class JobController {
synchronized (this) {
notifyAll();
maybeScaleUpRunners(jobStorage.getEligibleJobCount(System.currentTimeMillis()));
maybeScaleUpRunners(() -> jobStorage.getEligibleJobCount(System.currentTimeMillis()));
}
}
@@ -197,7 +199,7 @@ class JobController {
synchronized (this) {
notifyAll();
maybeScaleUpRunners(jobStorage.getEligibleJobCount(System.currentTimeMillis()));
maybeScaleUpRunners(() -> jobStorage.getEligibleJobCount(System.currentTimeMillis()));
}
}
@@ -233,7 +235,7 @@ class JobController {
synchronized (this) {
notifyAll();
maybeScaleUpRunners(jobStorage.getEligibleJobCount(System.currentTimeMillis()));
maybeScaleUpRunners(() -> jobStorage.getEligibleJobCount(System.currentTimeMillis()));
}
}
@@ -492,7 +494,7 @@ class JobController {
spawnGeneralRunner(0);
}
maybeScaleUpRunners(jobStorage.getEligibleJobCount(System.currentTimeMillis()));
maybeScaleUpRunners(() -> jobStorage.getEligibleJobCount(System.currentTimeMillis()));
notifyAll();
}
@@ -501,11 +503,12 @@ class JobController {
* Scales up the number of {@link JobRunner}s to satisfy the number of eligible jobs, if needed.
*/
@VisibleForTesting
synchronized void maybeScaleUpRunners(int eligibleJobCount) {
synchronized void maybeScaleUpRunners(IntSupplier eligibleJobCountSupplier) {
if (!runnersStarted.get()) {
return;
}
int eligibleJobCount = eligibleJobCountSupplier.getAsInt();
int activeRunners = this.activeGeneralRunners.size();
int maxPossibleRunnersToSpawn = maxGeneralRunners - activeRunners;
int runnersToCoverEligibleJobs = eligibleJobCount - activeRunners;

View File

@@ -113,7 +113,9 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
factoryCountIndex.getOrPut(minimalJobSpec.factoryKey) { AtomicInteger(0) }.incrementAndGet()
constraintsByJobId[fullSpec.jobSpec.id] = fullSpec.constraintSpecs.toMutableList()
dependenciesByJobId[fullSpec.jobSpec.id] = fullSpec.dependencySpecs.toMutableList()
if (fullSpec.dependencySpecs.isNotEmpty()) {
dependenciesByJobId[fullSpec.jobSpec.id] = fullSpec.dependencySpecs.toMutableList()
}
}
stopwatch?.split("cache")
stopwatch?.stop(TAG)
@@ -422,6 +424,8 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
@Synchronized
override fun debugAdditionalDetails(): String {
val nonEmptyDependencies = dependenciesByJobId.filterValues { it.isNotEmpty() }
return buildString {
appendLine("minimalJobs: Size(${minimalJobs.size}), Items(${minimalJobs.joinToString(", ") { it.toLogString() }})")
appendLine("jobSpecCache: Size(${jobSpecCache.size}), Items(${jobSpecCache.keys.joinToString(", ") { it.toLogString() }})")
@@ -429,7 +433,7 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
appendLine("migrationJobs: Size(${migrationJobs.size}), Items(${migrationJobs.joinToString(", ") { it.toLogString() }})")
appendLine("mostEligibleForQueue: Size(${mostEligibleJobForQueue.size}), Items(${mostEligibleJobForQueue.entries.joinToString(", ") { "[${it.key} => ${it.value.toLogString()}]" }})")
appendLine("constraintsByJobId: Size(${constraintsByJobId.size}), Items(${constraintsByJobId.entries.joinToString(", ") { "[${it.key.toLogString()} => ${it.value.joinToString(", ") { c -> c.toLogString() }}]" }})")
appendLine("dependenciesByJobId: Size(${dependenciesByJobId.size}), Items(${dependenciesByJobId.entries.joinToString(", ") { "[${it.key.toLogString()} => ${it.value.map { d -> d.toLogString() }}]" }})")
appendLine("dependenciesByJobId: Size(${nonEmptyDependencies.size}), Items(${nonEmptyDependencies.entries.joinToString(", ") { "[${it.key.toLogString()} => ${it.value.map { d -> d.toLogString() }}]" }})")
}
}

View File

@@ -479,7 +479,7 @@ class JobControllerTest {
fun `maybeScaleUpRunners - creates runners to satisfy demand`() {
// When
jobController.runnersStarted.set(true)
jobController.maybeScaleUpRunners(MAX_RUNNERS)
jobController.maybeScaleUpRunners { MAX_RUNNERS }
// Then
assertThat(jobController.activeGeneralRunners.size).isEqualTo(MAX_RUNNERS)
@@ -489,7 +489,7 @@ class JobControllerTest {
fun `maybeScaleUpRunners - does not exceed max runners`() {
// When
jobController.runnersStarted.set(true)
jobController.maybeScaleUpRunners(MAX_RUNNERS * 2)
jobController.maybeScaleUpRunners { MAX_RUNNERS * 2 }
// Then
assertThat(jobController.activeGeneralRunners.size).isEqualTo(MAX_RUNNERS)