Move constraint filtering down into JobStorage to improve perf.

This commit is contained in:
Greyson Parrelli
2024-07-18 13:59:24 -04:00
committed by Nicholas Tinsley
parent 36dface175
commit 06d475fb6e
11 changed files with 187 additions and 228 deletions

View File

@@ -197,28 +197,6 @@ class JobDatabase(
.readToList { it.toJobSpec() }
}
@Synchronized
fun getJobSpecsByKeys(keys: Collection<String>): List<JobSpec> {
if (keys.isEmpty()) {
return emptyList()
}
val output: MutableList<JobSpec> = ArrayList(keys.size)
for (query in SqlUtil.buildCollectionQuery(Jobs.JOB_SPEC_ID, keys)) {
readableDatabase
.select()
.from(Jobs.TABLE_NAME)
.where(query.where, query.whereArgs)
.run()
.forEach {
output += it.toJobSpec()
}
}
return output
}
@Synchronized
fun getMostEligibleJobInQueue(queue: String): JobSpec? {
return readableDatabase

View File

@@ -15,6 +15,7 @@ import org.thoughtcrime.securesms.jobmanager.persistence.DependencySpec;
import org.thoughtcrime.securesms.jobmanager.persistence.FullSpec;
import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec;
import org.thoughtcrime.securesms.jobmanager.persistence.JobStorage;
import org.thoughtcrime.securesms.jobs.MinimalJobSpec;
import org.thoughtcrime.securesms.util.Debouncer;
import java.util.ArrayList;
@@ -319,7 +320,7 @@ class JobController {
* When the job returned from this method has been run, you must call {@link #onJobFinished(Job)}.
*/
@WorkerThread
synchronized @NonNull Job pullNextEligibleJobForExecution(@NonNull JobPredicate predicate) {
synchronized @NonNull Job pullNextEligibleJobForExecution(@NonNull Predicate<MinimalJobSpec> predicate) {
try {
Job job;
@@ -479,24 +480,27 @@ class JobController {
}
@WorkerThread
private @Nullable Job getNextEligibleJobForExecution(@NonNull JobPredicate predicate) {
List<JobSpec> jobSpecs = Stream.of(jobStorage.getPendingJobsWithNoDependenciesInCreatedOrder(System.currentTimeMillis()))
.filter(predicate::shouldRun)
.toList();
private @Nullable Job getNextEligibleJobForExecution(@NonNull Predicate<MinimalJobSpec> predicate) {
JobSpec jobSpec = jobStorage.getNextEligibleJob(System.currentTimeMillis(), minimalJobSpec -> {
if (!predicate.test(minimalJobSpec)) {
return false;
}
for (JobSpec jobSpec : jobSpecs) {
List<ConstraintSpec> constraintSpecs = jobStorage.getConstraintSpecs(jobSpec.getId());
List<ConstraintSpec> constraintSpecs = jobStorage.getConstraintSpecs(minimalJobSpec.getId());
List<Constraint> constraints = Stream.of(constraintSpecs)
.map(ConstraintSpec::getFactoryKey)
.map(constraintInstantiator::instantiate)
.toList();
if (Stream.of(constraints).allMatch(Constraint::isMet)) {
return createJob(jobSpec, constraintSpecs);
}
return Stream.of(constraints).allMatch(Constraint::isMet);
});
if (jobSpec == null) {
return null;
}
return null;
List<ConstraintSpec> constraintSpecs = jobStorage.getConstraintSpecs(jobSpec.getId());
return createJob(jobSpec, constraintSpecs);
}
private @NonNull Job createJob(@NonNull JobSpec jobSpec, @NonNull List<ConstraintSpec> constraintSpecs) {

View File

@@ -15,6 +15,7 @@ import org.signal.core.util.logging.Log;
import org.thoughtcrime.securesms.jobmanager.impl.DefaultExecutorFactory;
import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec;
import org.thoughtcrime.securesms.jobmanager.persistence.JobStorage;
import org.thoughtcrime.securesms.jobs.MinimalJobSpec;
import org.thoughtcrime.securesms.util.Debouncer;
import org.thoughtcrime.securesms.util.TextSecurePreferences;
import org.thoughtcrime.securesms.util.Util;
@@ -46,6 +47,8 @@ public class JobManager implements ConstraintObserver.Notifier {
public static final int CURRENT_VERSION = 12;
private static final Predicate<MinimalJobSpec> NO_PREDICATE = spec -> true;
private final Application application;
private final Configuration configuration;
private final Executor executor;
@@ -109,10 +112,10 @@ public class JobManager implements ConstraintObserver.Notifier {
int id = 0;
for (int i = 0; i < configuration.getJobThreadCount(); i++) {
new JobRunner(application, ++id, jobController, JobPredicate.NONE).start();
new JobRunner(application, ++id, jobController, NO_PREDICATE).start();
}
for (JobPredicate predicate : configuration.getReservedJobRunners()) {
for (Predicate<MinimalJobSpec> predicate : configuration.getReservedJobRunners()) {
new JobRunner(application, ++id, jobController, predicate).start();
}
@@ -578,15 +581,15 @@ public class JobManager implements ConstraintObserver.Notifier {
public static class Configuration {
private final ExecutorFactory executorFactory;
private final int jobThreadCount;
private final JobInstantiator jobInstantiator;
private final ConstraintInstantiator constraintInstantiator;
private final List<ConstraintObserver> constraintObservers;
private final JobStorage jobStorage;
private final JobMigrator jobMigrator;
private final JobTracker jobTracker;
private final List<JobPredicate> reservedJobRunners;
private final ExecutorFactory executorFactory;
private final int jobThreadCount;
private final JobInstantiator jobInstantiator;
private final ConstraintInstantiator constraintInstantiator;
private final List<ConstraintObserver> constraintObservers;
private final JobStorage jobStorage;
private final JobMigrator jobMigrator;
private final JobTracker jobTracker;
private final List<Predicate<MinimalJobSpec>> reservedJobRunners;
private Configuration(int jobThreadCount,
@NonNull ExecutorFactory executorFactory,
@@ -596,7 +599,7 @@ public class JobManager implements ConstraintObserver.Notifier {
@NonNull JobStorage jobStorage,
@NonNull JobMigrator jobMigrator,
@NonNull JobTracker jobTracker,
@NonNull List<JobPredicate> reservedJobRunners)
@NonNull List<Predicate<MinimalJobSpec>> reservedJobRunners)
{
this.executorFactory = executorFactory;
this.jobThreadCount = jobThreadCount;
@@ -642,7 +645,7 @@ public class JobManager implements ConstraintObserver.Notifier {
return jobTracker;
}
@NonNull List<JobPredicate> getReservedJobRunners() {
@NonNull List<Predicate<MinimalJobSpec>> getReservedJobRunners() {
return reservedJobRunners;
}
@@ -656,14 +659,14 @@ public class JobManager implements ConstraintObserver.Notifier {
private JobStorage jobStorage = null;
private JobMigrator jobMigrator = null;
private JobTracker jobTracker = new JobTracker();
private List<JobPredicate> reservedJobRunners = new ArrayList<>();
private List<Predicate<MinimalJobSpec>> reservedJobRunners = new ArrayList<>();
public @NonNull Builder setJobThreadCount(int jobThreadCount) {
this.jobThreadCount = jobThreadCount;
return this;
}
public @NonNull Builder addReservedJobRunner(@NonNull JobPredicate predicate) {
public @NonNull Builder addReservedJobRunner(@NonNull Predicate<MinimalJobSpec> predicate) {
this.reservedJobRunners.add(predicate);
return this;
}

View File

@@ -1,11 +0,0 @@
package org.thoughtcrime.securesms.jobmanager;
import androidx.annotation.NonNull;
import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec;
public interface JobPredicate {
JobPredicate NONE = jobSpec -> true;
boolean shouldRun(@NonNull JobSpec jobSpec);
}

View File

@@ -8,10 +8,12 @@ import androidx.annotation.NonNull;
import com.annimon.stream.Stream;
import org.signal.core.util.logging.Log;
import org.thoughtcrime.securesms.jobs.MinimalJobSpec;
import org.thoughtcrime.securesms.util.WakeLockUtil;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
/**
* A thread that constantly checks for available {@link Job}s owned by the {@link JobController}.
@@ -27,12 +29,12 @@ class JobRunner extends Thread {
private static long WAKE_LOCK_TIMEOUT = TimeUnit.MINUTES.toMillis(10);
private final Application application;
private final int id;
private final JobController jobController;
private final JobPredicate jobPredicate;
private final Application application;
private final int id;
private final JobController jobController;
private final Predicate<MinimalJobSpec> jobPredicate;
JobRunner(@NonNull Application application, int id, @NonNull JobController jobController, @NonNull JobPredicate predicate) {
JobRunner(@NonNull Application application, int id, @NonNull JobController jobController, @NonNull Predicate<MinimalJobSpec> predicate) {
super("signal-JobRunner-" + id);
this.application = application;

View File

@@ -1,18 +1,16 @@
package org.thoughtcrime.securesms.jobmanager.impl;
import androidx.annotation.NonNull;
import org.thoughtcrime.securesms.jobmanager.JobPredicate;
import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec;
import org.thoughtcrime.securesms.jobs.MinimalJobSpec;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Predicate;
/**
* A {@link JobPredicate} that will only run jobs with the provided factory keys.
* A {@link Predicate} that will only run jobs with the provided factory keys.
*/
public final class FactoryJobPredicate implements JobPredicate {
public final class FactoryJobPredicate implements Predicate<MinimalJobSpec> {
private final Set<String> factories;
@@ -21,7 +19,7 @@ public final class FactoryJobPredicate implements JobPredicate {
}
@Override
public boolean shouldRun(@NonNull JobSpec jobSpec) {
return factories.contains(jobSpec.getFactoryKey());
public boolean test(MinimalJobSpec minimalJobSpec) {
return factories.contains(minimalJobSpec.getFactoryKey());
}
}
}

View File

@@ -1,6 +1,7 @@
package org.thoughtcrime.securesms.jobmanager.persistence
import androidx.annotation.WorkerThread
import org.thoughtcrime.securesms.jobs.MinimalJobSpec
import java.util.function.Predicate
interface JobStorage {
@@ -17,7 +18,7 @@ interface JobStorage {
fun getAllMatchingFilter(predicate: Predicate<JobSpec>): List<JobSpec>
@WorkerThread
fun getPendingJobsWithNoDependenciesInCreatedOrder(currentTime: Long): List<JobSpec>
fun getNextEligibleJob(currentTime: Long, filter: (MinimalJobSpec) -> Boolean): JobSpec?
@WorkerThread
fun getJobsInQueue(queue: String): List<JobSpec>

View File

@@ -27,7 +27,7 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
/**
* We keep a set of job specs in memory to facilitate fast retrieval. This is important because the most common job storage pattern is
* [getPendingJobsWithNoDependenciesInCreatedOrder], which needs to return full specs.
* [getNextEligibleJob], which needs to return full specs.
*/
private val jobSpecCache: LRUCache<String, JobSpec> = LRUCache(JOB_CACHE_LIMIT)
@@ -41,7 +41,7 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
/** We keep every dependency in memory, since there aren't that many, and managing a limited subset would be very complicated. */
private val dependenciesByJobId: MutableMap<String, MutableList<DependencySpec>> = hashMapOf()
/** The list of jobs eligible to be returned from [getPendingJobsWithNoDependenciesInCreatedOrder], kept sorted in the appropriate order. */
/** The list of jobs eligible to be returned from [getNextEligibleJob], kept sorted in the appropriate order. */
private val eligibleJobs: TreeSet<MinimalJobSpec> = TreeSet(EligibleMinJobComparator)
/** All migration-related jobs, kept in the appropriate order. */
@@ -124,16 +124,16 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
}
@Synchronized
override fun getPendingJobsWithNoDependenciesInCreatedOrder(currentTime: Long): List<JobSpec> {
override fun getNextEligibleJob(currentTime: Long, filter: (MinimalJobSpec) -> Boolean): JobSpec? {
val stopwatch = debugStopwatch("get-pending")
val migrationJob: MinimalJobSpec? = migrationJobs.firstOrNull()
return if (migrationJob != null && !migrationJob.isRunning && migrationJob.hasEligibleRunTime(currentTime)) {
listOf(migrationJob.toJobSpec())
migrationJob.toJobSpec()
} else if (migrationJob != null) {
emptyList()
null
} else {
val minJobs: List<MinimalJobSpec> = eligibleJobs
eligibleJobs
.asSequence()
.filter { job ->
// Filter out all jobs with unmet dependencies
@@ -141,9 +141,8 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
}
.filterNot { it.isRunning }
.filter { job -> job.hasEligibleRunTime(currentTime) }
.toList()
getFullJobs(minJobs)
.firstOrNull(filter)
?.toJobSpec()
}.also {
stopwatch?.stop(TAG)
}
@@ -521,21 +520,6 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
}
}
private fun getFullJobs(minJobs: Collection<MinimalJobSpec>): List<JobSpec> {
val requestedKeys = minJobs.map { it.id }.toSet()
val cachedKeys = jobSpecCache.keys.intersect(requestedKeys)
val uncachedKeys = requestedKeys.subtract(cachedKeys)
val cachedJobs = cachedKeys.map { jobSpecCache[it]!! }
val fetchedJobs = jobDatabase.getJobSpecsByKeys(uncachedKeys)
val sorted = TreeSet(EligibleFullJobComparator).apply {
addAll(cachedJobs)
addAll(fetchedJobs)
}
return sorted.toList()
}
private object EligibleMinJobComparator : Comparator<MinimalJobSpec> {
override fun compare(o1: MinimalJobSpec, o2: MinimalJobSpec): Int {
// We want to sort by priority descending, then createTime ascending