diff --git a/app/src/androidTest/java/org/thoughtcrime/securesms/jobs/JobManagerPerformanceTests.kt b/app/src/androidTest/java/org/thoughtcrime/securesms/jobs/JobManagerPerformanceTests.kt new file mode 100644 index 0000000000..b8876daaee --- /dev/null +++ b/app/src/androidTest/java/org/thoughtcrime/securesms/jobs/JobManagerPerformanceTests.kt @@ -0,0 +1,117 @@ +/* + * Copyright 2024 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.thoughtcrime.securesms.jobs + +import android.app.Application +import androidx.test.ext.junit.runners.AndroidJUnit4 +import org.junit.Ignore +import org.junit.Test +import org.junit.runner.RunWith +import org.signal.core.util.EventTimer +import org.signal.core.util.logging.Log +import org.thoughtcrime.securesms.database.JobDatabase.Companion.getInstance +import org.thoughtcrime.securesms.dependencies.AppDependencies +import org.thoughtcrime.securesms.jobmanager.Job +import org.thoughtcrime.securesms.jobmanager.JobManager +import org.thoughtcrime.securesms.jobmanager.JobMigrator +import org.thoughtcrime.securesms.jobmanager.JobTracker +import org.thoughtcrime.securesms.util.TextSecurePreferences +import java.util.concurrent.CountDownLatch +import kotlin.random.Random + +@Ignore("This is just for testing performance, not correctness, and they can therefore take a long time. Run them manually when you need to.") +@RunWith(AndroidJUnit4::class) +class JobManagerPerformanceTests { + + companion object { + val TAG = Log.tag(JobManagerPerformanceTests::class.java) + } + + @Test + fun testPerformance_singleQueue() { + runTest(2000) { TestJob(queue = "queue") } + } + + @Test + fun testPerformance_fourQueues() { + runTest(2000) { TestJob(queue = "queue-${Random.nextInt(1, 5)}") } + } + + @Test + fun testPerformance_noQueues() { + runTest(2000) { TestJob(queue = null) } + } + + private fun runTest(count: Int, jobCreator: () -> TestJob) { + val context = AppDependencies.application + val jobManager = testJobManager(context) + + jobManager.beginJobLoop() + + val eventTimer = EventTimer() + + val latch = CountDownLatch(count) + var seenStart = false + jobManager.addListener({ it.factoryKey == TestJob.KEY }) { _, state -> + if (!seenStart && state == JobTracker.JobState.RUNNING) { + // Adding the jobs can take a while (and runs on a background thread), so we want to reset the timer the first time we see a job run so the first job + // doesn't have a skewed time + eventTimer.reset() + seenStart = true + } + if (state.isComplete) { + eventTimer.emit("job") + latch.countDown() + if (latch.count % 100 == 0L) { + Log.d(TAG, "Finished ${count - latch.count}/$count jobs") + } + } + } + + Log.i(TAG, "Adding jobs...") + jobManager.addAll((1..count).map { jobCreator() }) + + Log.i(TAG, "Waiting for jobs to complete...") + latch.await() + Log.i(TAG, "Jobs complete!") + Log.i(TAG, eventTimer.stop().summary) + } + + private fun testJobManager(context: Application): JobManager { + val config = JobManager.Configuration.Builder() + .setJobFactories( + JobManagerFactories.getJobFactories(context) + mapOf( + TestJob.KEY to TestJob.Factory() + ) + ) + .setConstraintFactories(JobManagerFactories.getConstraintFactories(context)) + .setConstraintObservers(JobManagerFactories.getConstraintObservers(context)) + .setJobStorage(FastJobStorage(getInstance(context))) + .setJobMigrator(JobMigrator(TextSecurePreferences.getJobManagerVersion(context), JobManager.CURRENT_VERSION, JobManagerFactories.getJobMigrations(context))) + .build() + + return JobManager(context, config) + } + + private class TestJob(params: Parameters) : Job(params) { + companion object { + const val KEY = "test" + } + + constructor(queue: String?) : this(Parameters.Builder().setQueue(queue).build()) + + override fun serialize(): ByteArray? = null + override fun getFactoryKey(): String = KEY + override fun run(): Result = Result.success() + override fun onFailure() = Unit + + class Factory : Job.Factory { + override fun create(parameters: Parameters, serializedData: ByteArray?): TestJob { + return TestJob(parameters) + } + } + } +} diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/JobDatabase.kt b/app/src/main/java/org/thoughtcrime/securesms/database/JobDatabase.kt index d298fd3807..dab3fd824e 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/JobDatabase.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/JobDatabase.kt @@ -197,6 +197,40 @@ class JobDatabase( .readToList { it.toJobSpec() } } + @Synchronized + fun getJobSpecsByKeys(keys: Collection): List { + if (keys.isEmpty()) { + return emptyList() + } + + val output: MutableList = ArrayList(keys.size) + + for (query in SqlUtil.buildCollectionQuery(Jobs.JOB_SPEC_ID, keys)) { + readableDatabase + .select() + .from(Jobs.TABLE_NAME) + .where(query.where, query.whereArgs) + .run() + .forEach { + output += it.toJobSpec() + } + } + + return output + } + + @Synchronized + fun getMostEligibleJobInQueue(queue: String): JobSpec? { + return readableDatabase + .select() + .from(Jobs.TABLE_NAME) + .where("${Jobs.QUEUE_KEY} = ?", queue) + .orderBy("${Jobs.PRIORITY} DESC, ${Jobs.CREATE_TIME} ASC, ${Jobs.ID} ASC") + .limit(1) + .run() + .readToSingleObject { it.toJobSpec() } + } + @Synchronized fun getAllMatchingFilter(predicate: Predicate): List { val output: MutableList = mutableListOf() diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/FastJobStorage.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/FastJobStorage.kt index 4b7dfaa5b3..137c60bd83 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/FastJobStorage.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/FastJobStorage.kt @@ -1,6 +1,8 @@ package org.thoughtcrime.securesms.jobs import androidx.annotation.VisibleForTesting +import org.signal.core.util.Stopwatch +import org.signal.core.util.logging.Log import org.thoughtcrime.securesms.database.JobDatabase import org.thoughtcrime.securesms.jobmanager.Job import org.thoughtcrime.securesms.jobmanager.persistence.ConstraintSpec @@ -15,7 +17,9 @@ import java.util.function.Predicate class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage { companion object { + private val TAG = Log.tag(FastJobStorage::class) private const val JOB_CACHE_LIMIT = 1000 + private const val DEBUG = false } /** We keep a trimmed down version of every job in memory. */ @@ -38,7 +42,7 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage { private val dependenciesByJobId: MutableMap> = hashMapOf() /** The list of jobs eligible to be returned from [getPendingJobsWithNoDependenciesInCreatedOrder], kept sorted in the appropriate order. */ - private val eligibleJobs: TreeSet = TreeSet(EligibleJobComparator) + private val eligibleJobs: TreeSet = TreeSet(EligibleMinJobComparator) /** All migration-related jobs, kept in the appropriate order. */ private val migrationJobs: TreeSet = TreeSet(compareBy { it.createTime }) @@ -48,7 +52,9 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage { @Synchronized override fun init() { + val stopwatch = Stopwatch("init", decimalPlaces = 2) minimalJobs += jobDatabase.getAllMinimalJobSpecs() + stopwatch.split("fetch-min-jobs") for (job in minimalJobs) { if (job.queueKey == Job.Parameters.MIGRATION_QUEUE_KEY) { @@ -57,29 +63,37 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage { placeJobInEligibleList(job) } } + stopwatch.split("sort-min-jobs") jobDatabase.getJobSpecs(JOB_CACHE_LIMIT).forEach { jobSpecCache[it.id] = it } + stopwatch.split("fetch-full-jobs") for (constraintSpec in jobDatabase.getConstraintSpecsForJobs(jobSpecCache.keys)) { val jobConstraints: MutableList = constraintsByJobId.getOrPut(constraintSpec.jobSpecId) { mutableListOf() } jobConstraints += constraintSpec } + stopwatch.split("fetch-constraints") for (dependencySpec in jobDatabase.getAllDependencySpecs().filterNot { it.hasCircularDependency() }) { val jobDependencies: MutableList = dependenciesByJobId.getOrPut(dependencySpec.jobId) { mutableListOf() } jobDependencies += dependencySpec } + stopwatch.split("fetch-dependencies") + + stopwatch.stop(TAG) } @Synchronized override fun insertJobs(fullSpecs: List) { + val stopwatch = debugStopwatch("insert") val durable: List = fullSpecs.filterNot { it.isMemoryOnly } if (durable.isNotEmpty()) { jobDatabase.insertJobs(durable) } + stopwatch?.split("db") for (fullSpec in fullSpecs) { val minimalJobSpec = fullSpec.jobSpec.toMinimalJobSpec() @@ -95,6 +109,8 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage { constraintsByJobId[fullSpec.jobSpec.id] = fullSpec.constraintSpecs.toMutableList() dependenciesByJobId[fullSpec.jobSpec.id] = fullSpec.dependencySpecs.toMutableList() } + stopwatch?.split("cache") + stopwatch?.stop(TAG) } @Synchronized @@ -109,6 +125,7 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage { @Synchronized override fun getPendingJobsWithNoDependenciesInCreatedOrder(currentTime: Long): List { + val stopwatch = debugStopwatch("get-pending") val migrationJob: MinimalJobSpec? = migrationJobs.firstOrNull() return if (migrationJob != null && !migrationJob.isRunning && migrationJob.hasEligibleRunTime(currentTime)) { @@ -116,7 +133,7 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage { } else if (migrationJob != null) { emptyList() } else { - eligibleJobs + val minJobs: List = eligibleJobs .asSequence() .filter { job -> // Filter out all jobs with unmet dependencies @@ -124,8 +141,11 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage { } .filterNot { it.isRunning } .filter { job -> job.hasEligibleRunTime(currentTime) } - .map { it.toJobSpec() } .toList() + + getFullJobs(minJobs) + }.also { + stopwatch?.stop(TAG) } } @@ -282,6 +302,8 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage { .map { it.toMinimalJobSpec() } .toSet() + val affectedQueues: Set = minimalJobsToDelete.mapNotNull { it.queueKey }.toSet() + if (durableJobIdsToDelete.isNotEmpty()) { jobDatabase.deleteJobs(durableJobIdsToDelete) } @@ -292,6 +314,15 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage { eligibleJobs.removeAll(minimalJobsToDelete) migrationJobs.removeAll(minimalJobsToDelete) + mostEligibleJobForQueue.keys.removeAll(affectedQueues) + + for (queue in affectedQueues) { + jobDatabase.getMostEligibleJobInQueue(queue)?.let { + jobSpecCache[it.id] = it + placeJobInEligibleList(it.toMinimalJobSpec()) + } + } + for (jobId in ids) { constraintsByJobId.remove(jobId) dependenciesByJobId.remove(jobId) @@ -490,7 +521,22 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage { } } - private object EligibleJobComparator : Comparator { + private fun getFullJobs(minJobs: Collection): List { + val requestedKeys = minJobs.map { it.id }.toSet() + val cachedKeys = jobSpecCache.keys.intersect(requestedKeys) + val uncachedKeys = requestedKeys.subtract(cachedKeys) + + val cachedJobs = cachedKeys.map { jobSpecCache[it]!! } + val fetchedJobs = jobDatabase.getJobSpecsByKeys(uncachedKeys) + + val sorted = TreeSet(EligibleFullJobComparator).apply { + addAll(cachedJobs) + addAll(fetchedJobs) + } + return sorted.toList() + } + + private object EligibleMinJobComparator : Comparator { override fun compare(o1: MinimalJobSpec, o2: MinimalJobSpec): Int { // We want to sort by priority descending, then createTime ascending @@ -506,6 +552,25 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage { } } } + + /** + * Identical to [EligibleMinJobComparator], but for full jobs. + */ + private object EligibleFullJobComparator : Comparator { + override fun compare(o1: JobSpec, o2: JobSpec): Int { + return when { + o1.priority > o2.priority -> -1 + o1.priority < o2.priority -> 1 + o1.createTime < o2.createTime -> -1 + o1.createTime > o2.createTime -> 1 + else -> o1.id.compareTo(o2.id) + } + } + } + + private fun debugStopwatch(label: String): Stopwatch? { + return if (DEBUG) Stopwatch(label, decimalPlaces = 2) else null + } } /** diff --git a/app/src/test/java/org/thoughtcrime/securesms/jobs/FastJobStorageTest.kt b/app/src/test/java/org/thoughtcrime/securesms/jobs/FastJobStorageTest.kt index b93f3d64b4..dafb2159d9 100644 --- a/app/src/test/java/org/thoughtcrime/securesms/jobs/FastJobStorageTest.kt +++ b/app/src/test/java/org/thoughtcrime/securesms/jobs/FastJobStorageTest.kt @@ -533,6 +533,33 @@ class FastJobStorageTest { jobs.contains(DataSet1.JOB_1) assertIs false } + @Test + fun `getPendingJobsWithNoDependenciesInCreatedOrder - after deleted, next item in queue is eligible`() { + // Two jobs in the same queue but with different create times + val firstJob = DataSet1.JOB_1 + val secondJob = DataSet1.JOB_1.copy(id = "id2", createTime = 2) + val subject = FastJobStorage( + mockDatabase( + fullSpecs = listOf( + FullSpec(jobSpec = firstJob, constraintSpecs = emptyList(), dependencySpecs = emptyList()), + FullSpec(jobSpec = secondJob, constraintSpecs = emptyList(), dependencySpecs = emptyList()) + ) + ) + ) + subject.init() + + var jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(100) + jobs.size assertIs 1 + jobs.contains(firstJob) assertIs true + + subject.deleteJobs(listOf("id1")) + + jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(100) + jobs.size assertIs 1 + jobs.contains(firstJob) assertIs false + jobs.contains(secondJob) assertIs true + } + @Test fun `getPendingJobsWithNoDependenciesInCreatedOrder - after marked running, no longer is in eligible list`() { val subject = FastJobStorage(mockDatabase(DataSet1.FULL_SPECS)) @@ -798,6 +825,10 @@ class FastJobStorageTest { every { mock.getAllDependencySpecs() } returns dependencies every { mock.getConstraintSpecsForJobs(any()) } returns constraints every { mock.getJobSpec(any()) } answers { jobs.first { it.id == firstArg() } } + every { mock.getJobSpecsByKeys(any()) } answers { + val ids: Collection = firstArg() + jobs.filter { ids.contains(it.id) } + } every { mock.insertJobs(any()) } answers { val inserts: List = firstArg() for (insert in inserts) { @@ -863,6 +894,12 @@ class FastJobStorageTest { } } } + every { mock.getMostEligibleJobInQueue(any()) } answers { + jobs + .filter { it.queueKey == firstArg() } + .sortedByDescending { it.priority } + .minByOrNull { it.createTime } + } return mock } diff --git a/core-util-jvm/src/main/java/org/signal/core/util/Stopwatch.kt b/core-util-jvm/src/main/java/org/signal/core/util/Stopwatch.kt index 798724e705..d9315ca6f1 100644 --- a/core-util-jvm/src/main/java/org/signal/core/util/Stopwatch.kt +++ b/core-util-jvm/src/main/java/org/signal/core/util/Stopwatch.kt @@ -8,7 +8,6 @@ package org.signal.core.util import org.signal.core.util.logging.Log import kotlin.time.Duration.Companion.nanoseconds import kotlin.time.DurationUnit -import kotlin.time.ExperimentalTime import kotlin.time.measureTimedValue /** @@ -83,7 +82,6 @@ class Stopwatch @JvmOverloads constructor(private val title: String, private val /** * Logs how long it takes to perform the operation. */ -@OptIn(ExperimentalTime::class) inline fun logTime(tag: String, label: String, decimalPlaces: Int = 0, block: () -> T): T { val result = measureTimedValue(block) Log.d(tag, "$label: ${result.duration.toDouble(DurationUnit.MILLISECONDS).roundedString(decimalPlaces)}") diff --git a/core-util/src/main/java/org/signal/core/util/EventTimer.kt b/core-util/src/main/java/org/signal/core/util/EventTimer.kt index 47c90eecba..70d6d5b66a 100644 --- a/core-util/src/main/java/org/signal/core/util/EventTimer.kt +++ b/core-util/src/main/java/org/signal/core/util/EventTimer.kt @@ -23,9 +23,15 @@ class EventTimer { private val durationsByGroup: MutableMap> = mutableMapOf() - private val startTime = System.nanoTime() + private var startTime = System.nanoTime() private var lastTimeNanos: Long = startTime + fun reset() { + startTime = System.nanoTime() + lastTimeNanos = startTime + durationsByGroup.clear() + } + /** * Indicates an event in the specified group has finished. */