Add additional logging around JobRunners.

This commit is contained in:
Greyson Parrelli
2025-08-26 11:17:45 -04:00
committed by Michelle Tang
parent 23bbe704ab
commit 1a1ddbfa39
3 changed files with 74 additions and 41 deletions
@@ -27,6 +27,7 @@ import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@@ -43,15 +44,15 @@ class JobController {
private static final Predicate<MinimalJobSpec> NO_PREDICATE = spec -> true; private static final Predicate<MinimalJobSpec> NO_PREDICATE = spec -> true;
private final Application application; private final Application application;
private final JobStorage jobStorage; private final JobStorage jobStorage;
private final JobInstantiator jobInstantiator; private final JobInstantiator jobInstantiator;
private final ConstraintInstantiator constraintInstantiator; private final ConstraintInstantiator constraintInstantiator;
private final JobTracker jobTracker; private final JobTracker jobTracker;
private final Scheduler scheduler; private final Scheduler scheduler;
private final Debouncer debouncer; private final Debouncer debouncer;
private final Callback callback; private final Callback callback;
private final Map<String, Job> runningJobs; private final Map<String, ActiveJobInfo> runningJobs;
private final int minGeneralRunners; private final int minGeneralRunners;
private final int maxGeneralRunners; private final int maxGeneralRunners;
@@ -242,11 +243,11 @@ class JobController {
List<Job> inactiveJobDependents = Collections.emptyList(); List<Job> inactiveJobDependents = Collections.emptyList();
synchronized (this) { synchronized (this) {
Job runningJob = runningJobs.get(id); ActiveJobInfo runningJob = runningJobs.get(id);
if (runningJob != null) { if (runningJob != null) {
Log.w(TAG, JobLogger.format(runningJob, "Canceling while running.")); Log.w(TAG, JobLogger.format(runningJob.job, "Canceling while running."));
runningJob.cancel(); runningJob.job.cancel();
} else { } else {
JobSpec jobSpec = jobStorage.getJobSpec(id); JobSpec jobSpec = jobStorage.getJobSpec(id);
@@ -373,7 +374,7 @@ class JobController {
* @return Job to execute, or null if the timeout is hit * @return Job to execute, or null if the timeout is hit
*/ */
@WorkerThread @WorkerThread
synchronized @Nullable Job pullNextEligibleJobForExecution(@NonNull Predicate<MinimalJobSpec> predicate, long timeoutMs) { synchronized @Nullable Job pullNextEligibleJobForExecution(@NonNull Predicate<MinimalJobSpec> predicate, String runnerName, long timeoutMs) {
try { try {
Job job; Job job;
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
@@ -395,7 +396,7 @@ class JobController {
} }
jobStorage.markJobAsRunning(job.getId(), System.currentTimeMillis()); jobStorage.markJobAsRunning(job.getId(), System.currentTimeMillis());
runningJobs.put(job.getId(), job); runningJobs.put(job.getId(), new ActiveJobInfo(job, runnerName, timeoutMs == 0));
jobTracker.onStateChange(job, JobTracker.JobState.RUNNING); jobTracker.onStateChange(job, JobTracker.JobState.RUNNING);
return job; return job;
@@ -410,6 +411,7 @@ class JobController {
*/ */
@WorkerThread @WorkerThread
synchronized @NonNull String getDebugInfo() { synchronized @NonNull String getDebugInfo() {
List<JobSpec> running = runningJobs.keySet().stream().map(jobStorage::getJobSpec).collect(java.util.stream.Collectors.toList());
List<JobSpec> jobs = jobStorage.debugGetJobSpecs(1000); List<JobSpec> jobs = jobStorage.debugGetJobSpecs(1000);
List<ConstraintSpec> constraints = jobStorage.debugGetConstraintSpecs(1000); List<ConstraintSpec> constraints = jobStorage.debugGetConstraintSpecs(1000);
List<DependencySpec> dependencies = jobStorage.debugGetAllDependencySpecs(); List<DependencySpec> dependencies = jobStorage.debugGetAllDependencySpecs();
@@ -417,7 +419,18 @@ class JobController {
StringBuilder info = new StringBuilder(); StringBuilder info = new StringBuilder();
info.append("-- Jobs\n"); info.append("-- Running Jobs\n");
if (!jobs.isEmpty()) {
running.stream().forEach(j -> {
ActiveJobInfo activeInfo = Objects.requireNonNull(runningJobs.get(j.getId()));
info.append("[").append(activeInfo.runnerName).append("] ").append(j.toString()).append('\n');
});
} else {
info.append("None\n");
}
info.append("\n-- Jobs\n");
if (!jobs.isEmpty()) { if (!jobs.isEmpty()) {
Stream.of(jobs).forEach(j -> info.append(j.toString()).append('\n')); Stream.of(jobs).forEach(j -> info.append(j.toString()).append('\n'));
} else { } else {
@@ -439,10 +452,14 @@ class JobController {
} }
info.append("\n-- Additional Details\n"); info.append("\n-- Additional Details\n");
info.append("Runners started: ").append(runnersStarted.get()).append('\n');
info.append("General runner count: ").append(activeGeneralRunners.size()).append('\n');
info.append("Reserved runner count: ").append(reservedRunnerPredicates.size()).append("\n\n");
if (additional != null) { if (additional != null) {
info.append(additional).append('\n'); info.append(additional).append('\n');
} else { } else {
info.append("None\n"); info.append("No job storage info.\n");
} }
return info.toString(); return info.toString();
@@ -462,9 +479,9 @@ class JobController {
for (Predicate<MinimalJobSpec> predicate : reservedRunnerPredicates) { for (Predicate<MinimalJobSpec> predicate : reservedRunnerPredicates) {
int id = nextRunnerId.incrementAndGet(); int id = nextRunnerId.incrementAndGet();
JobRunner runner = new JobRunner(application, id, this, predicate == null ? NO_PREDICATE : predicate, 0); JobRunner runner = new JobRunner(application, JobRunner.generateName(id, true, true), this, predicate == null ? NO_PREDICATE : predicate, 0);
runner.start(); runner.start();
Log.i(TAG, "Spawned new reserved JobRunner[" + id + "]"); Log.i(TAG, "Spawned new runner " + runner.getName());
} }
for (int i = 0; i < minGeneralRunners; i++) { for (int i = 0; i < minGeneralRunners; i++) {
@@ -501,17 +518,17 @@ class JobController {
private synchronized void spawnGeneralRunner(long timeOutMs) { private synchronized void spawnGeneralRunner(long timeOutMs) {
int id = nextRunnerId.incrementAndGet(); int id = nextRunnerId.incrementAndGet();
JobRunner runner = new JobRunner(application, id, this, NO_PREDICATE, timeOutMs); JobRunner runner = new JobRunner(application, JobRunner.generateName(id, false, timeOutMs == 0), this, NO_PREDICATE, timeOutMs);
runner.start(); runner.start();
activeGeneralRunners.add(runner); activeGeneralRunners.add(runner);
Log.d(TAG, "Spawned new " + (timeOutMs == 0 ? "core" : "temporary") + " general JobRunner[" + id + "] (CurrentActive: " + activeGeneralRunners.size() + ")"); Log.d(TAG, "Spawned new general runner " + runner.getName() + " (CurrentActive: " + activeGeneralRunners.size() + ")");
} }
@VisibleForTesting @VisibleForTesting
synchronized void onRunnerTerminated(@NonNull JobRunner runner) { synchronized void onRunnerTerminated(@NonNull JobRunner runner) {
activeGeneralRunners.remove(runner); activeGeneralRunners.remove(runner);
Log.i(TAG, "JobRunner[" + runner.getId() + "] terminated. (CurrentActive: " + activeGeneralRunners.size() + ")"); Log.i(TAG, runner.getName() + " terminated. (CurrentActive: " + activeGeneralRunners.size() + ")");
} }
@WorkerThread @WorkerThread
@@ -712,4 +729,10 @@ class JobController {
interface Callback { interface Callback {
void onEmpty(); void onEmpty();
} }
record ActiveJobInfo(
@NonNull Job job,
String runnerName,
boolean coreRunner
) {}
} }
@@ -28,7 +28,6 @@ class JobRunner extends Thread {
private static long WAKE_LOCK_TIMEOUT = TimeUnit.MINUTES.toMillis(10); private static long WAKE_LOCK_TIMEOUT = TimeUnit.MINUTES.toMillis(10);
private final Application application; private final Application application;
private final int id;
private final JobController jobController; private final JobController jobController;
private final Predicate<MinimalJobSpec> jobPredicate; private final Predicate<MinimalJobSpec> jobPredicate;
private final long idleTimeoutMs; private final long idleTimeoutMs;
@@ -36,11 +35,10 @@ class JobRunner extends Thread {
/** /**
* @param idleTimeoutMs If the runner experiences no activity within this duration, it will terminate. If set to 0, it will never terminate. * @param idleTimeoutMs If the runner experiences no activity within this duration, it will terminate. If set to 0, it will never terminate.
*/ */
JobRunner(@NonNull Application application, int id, @NonNull JobController jobController, @NonNull Predicate<MinimalJobSpec> predicate, long idleTimeoutMs) { JobRunner(@NonNull Application application, @NonNull String name, @NonNull JobController jobController, @NonNull Predicate<MinimalJobSpec> predicate, long idleTimeoutMs) {
super("JobRunner-" + (idleTimeoutMs == 0 ? "core-" : "temp-") + id); super(name);
this.application = application; this.application = application;
this.id = id;
this.jobController = jobController; this.jobController = jobController;
this.jobPredicate = predicate; this.jobPredicate = predicate;
this.idleTimeoutMs = idleTimeoutMs; this.idleTimeoutMs = idleTimeoutMs;
@@ -48,16 +46,16 @@ class JobRunner extends Thread {
@Override @Override
public synchronized void run() { public synchronized void run() {
Log.i(TAG, "JobRunner " + id + " started" + (idleTimeoutMs > 0 ? " with idle timeout " + idleTimeoutMs + "ms" : " with no idle timeout")); Log.i(TAG, getName() + " started" + (idleTimeoutMs > 0 ? " with idle timeout " + idleTimeoutMs + "ms" : " with no idle timeout"));
while (true) { while (true) {
Job job = jobController.pullNextEligibleJobForExecution(jobPredicate, idleTimeoutMs); Job job = jobController.pullNextEligibleJobForExecution(jobPredicate, getName(), idleTimeoutMs);
if (job == null && idleTimeoutMs > 0) { if (job == null && idleTimeoutMs > 0) {
Log.i(TAG, "JobRunner " + id + " terminating due to inactivity"); Log.i(TAG, getName() + " terminating due to inactivity");
jobController.onRunnerTerminated(this); jobController.onRunnerTerminated(this);
break; break;
} else if (job == null) { } else if (job == null) {
Log.i(TAG, "JobRunner " + id + " unexpectedly given a null job. Going around the loop."); Log.i(TAG, getName() + " unexpectedly given a null job. Going around the loop.");
continue; continue;
} }
@@ -86,10 +84,10 @@ class JobRunner extends Thread {
private Job.Result run(@NonNull Job job) { private Job.Result run(@NonNull Job job) {
long runStartTime = System.currentTimeMillis(); long runStartTime = System.currentTimeMillis();
Log.i(TAG, JobLogger.format(job, String.valueOf(id), "Running job.")); Log.i(TAG, JobLogger.format(job, getName(), "Running job."));
if (isJobExpired(job)) { if (isJobExpired(job)) {
Log.w(TAG, JobLogger.format(job, String.valueOf(id), "Failing after surpassing its lifespan.")); Log.w(TAG, JobLogger.format(job, getName(), "Failing after surpassing its lifespan."));
return Job.Result.failure(); return Job.Result.failure();
} }
@@ -101,14 +99,14 @@ class JobRunner extends Thread {
result = job.run(); result = job.run();
if (job.isCanceled()) { if (job.isCanceled()) {
Log.w(TAG, JobLogger.format(job, String.valueOf(id), "Failing because the job was canceled.")); Log.w(TAG, JobLogger.format(job, getName(), "Failing because the job was canceled."));
result = Job.Result.failure(); result = Job.Result.failure();
} }
} catch (RuntimeException e) { } catch (RuntimeException e) {
Log.w(TAG, JobLogger.format(job, String.valueOf(id), "Failing fatally due to an unexpected runtime exception."), e); Log.w(TAG, JobLogger.format(job, getName(), "Failing fatally due to an unexpected runtime exception."), e);
return Job.Result.fatalFailure(e); return Job.Result.fatalFailure(e);
} catch (Exception e) { } catch (Exception e) {
Log.w(TAG, JobLogger.format(job, String.valueOf(id), "Failing due to an unexpected exception."), e); Log.w(TAG, JobLogger.format(job, getName(), "Failing due to an unexpected exception."), e);
return Job.Result.failure(); return Job.Result.failure();
} finally { } finally {
if (wakeLock != null) { if (wakeLock != null) {
@@ -122,7 +120,7 @@ class JobRunner extends Thread {
job.getRunAttempt() + 1 >= job.getParameters().getMaxAttempts() && job.getRunAttempt() + 1 >= job.getParameters().getMaxAttempts() &&
job.getParameters().getMaxAttempts() != Job.Parameters.UNLIMITED) job.getParameters().getMaxAttempts() != Job.Parameters.UNLIMITED)
{ {
Log.w(TAG, JobLogger.format(job, String.valueOf(id), "Failing after surpassing its max number of attempts.")); Log.w(TAG, JobLogger.format(job, getName(), "Failing after surpassing its max number of attempts."));
return Job.Result.failure(); return Job.Result.failure();
} }
@@ -141,11 +139,23 @@ class JobRunner extends Thread {
private void printResult(@NonNull Job job, @NonNull Job.Result result, long runStartTime) { private void printResult(@NonNull Job job, @NonNull Job.Result result, long runStartTime) {
if (result.getException() != null) { if (result.getException() != null) {
Log.e(TAG, JobLogger.format(job, String.valueOf(id), "Job failed with a fatal exception. Crash imminent.")); Log.e(TAG, JobLogger.format(job, getName(), "Job failed with a fatal exception. Crash imminent."));
} else if (result.isFailure()) { } else if (result.isFailure()) {
Log.w(TAG, JobLogger.format(job, String.valueOf(id), "Job failed.")); Log.w(TAG, JobLogger.format(job, getName(), "Job failed."));
} else { } else {
Log.i(TAG, JobLogger.format(job, String.valueOf(id), "Job finished with result " + result + " in " + (System.currentTimeMillis() - runStartTime) + " ms.")); Log.i(TAG, JobLogger.format(job, getName(), "Job finished with result " + result + " in " + (System.currentTimeMillis() - runStartTime) + " ms."));
} }
} }
static @NonNull String generateName(int id, boolean reserved, boolean core) {
if (reserved) {
return "JobRunner-Rsrv-" + id;
}
if (core) {
return "JobRunner-Core-" + id;
}
return "JobRunner-Temp-" + id;
}
} }
@@ -157,7 +157,7 @@ class JobControllerTest {
every { jobStorage.getNextEligibleJob(any(), any()) } returns null every { jobStorage.getNextEligibleJob(any(), any()) } returns null
// When // When
val result = jobController.pullNextEligibleJobForExecution({ true }, 100) val result = jobController.pullNextEligibleJobForExecution({ true }, "runner", 100)
// Then // Then
assertThat(result).isNull() assertThat(result).isNull()
@@ -175,7 +175,7 @@ class JobControllerTest {
every { jobStorage.markJobAsRunning("test-job-1", any()) } returns Unit every { jobStorage.markJobAsRunning("test-job-1", any()) } returns Unit
// When // When
val result = jobController.pullNextEligibleJobForExecution({ true }, 0) val result = jobController.pullNextEligibleJobForExecution({ true }, "runner", 0)
// Then // Then
assertThat(result).isEqualTo(testJob) assertThat(result).isEqualTo(testJob)
@@ -313,7 +313,7 @@ class JobControllerTest {
every { jobStorage.getNextEligibleJob(any(), any()) } returns null every { jobStorage.getNextEligibleJob(any(), any()) } returns null
// When // When
jobController.pullNextEligibleJobForExecution({ true }, 10) jobController.pullNextEligibleJobForExecution({ true }, "runner", 10)
// Then // Then
verify { debouncer.publish(any()) } verify { debouncer.publish(any()) }