From 86cf8200b5d121af66f73c0bded873203afc6547 Mon Sep 17 00:00:00 2001 From: Greyson Parrelli Date: Wed, 17 Jul 2024 11:58:09 -0400 Subject: [PATCH] Remove cases where all jobs were expected to be in memory. --- .../securesms/database/JobDatabase.kt | 184 +++++++++++------- .../securesms/jobmanager/JobController.java | 23 +-- .../securesms/jobmanager/JobMigration.kt | 27 +++ .../securesms/jobmanager/JobMigrator.java | 52 +++-- .../jobmanager/persistence/JobStorage.java | 68 ------- .../jobmanager/persistence/JobStorage.kt | 69 +++++++ .../securesms/jobs/FastJobStorage.kt | 112 +++++++---- .../securesms/jobmanager/JobMigratorTest.java | 13 +- .../securesms/jobs/FastJobStorageTest.kt | 109 +++++++++-- .../core/util/SQLiteDatabaseExtensions.kt | 4 + 10 files changed, 422 insertions(+), 239 deletions(-) delete mode 100644 app/src/main/java/org/thoughtcrime/securesms/jobmanager/persistence/JobStorage.java create mode 100644 app/src/main/java/org/thoughtcrime/securesms/jobmanager/persistence/JobStorage.kt diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/JobDatabase.kt b/app/src/main/java/org/thoughtcrime/securesms/database/JobDatabase.kt index 924c22be83..d298fd3807 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/JobDatabase.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/JobDatabase.kt @@ -8,8 +8,10 @@ import androidx.core.content.contentValuesOf import net.zetetic.database.sqlcipher.SQLiteDatabase import net.zetetic.database.sqlcipher.SQLiteOpenHelper import org.signal.core.util.CursorUtil +import org.signal.core.util.SqlUtil import org.signal.core.util.concurrent.SignalExecutors import org.signal.core.util.delete +import org.signal.core.util.forEach import org.signal.core.util.insertInto import org.signal.core.util.logging.Log import org.signal.core.util.readToList @@ -31,6 +33,7 @@ import org.thoughtcrime.securesms.jobmanager.persistence.DependencySpec import org.thoughtcrime.securesms.jobmanager.persistence.FullSpec import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec import org.thoughtcrime.securesms.jobs.MinimalJobSpec +import java.util.function.Predicate class JobDatabase( application: Application, @@ -184,28 +187,32 @@ class JobDatabase( } @Synchronized - fun getAllJobSpecs(): List { - return readableDatabase - .select() - .from(Jobs.TABLE_NAME) - .orderBy("${Jobs.CREATE_TIME}, ${Jobs.ID} ASC") - .run() - .readToList { cursor -> - jobSpecFromCursor(cursor) - } - } - - @Synchronized - fun getOldestJobSpecs(limit: Int): List { + fun getJobSpecs(limit: Int): List { return readableDatabase .select() .from(Jobs.TABLE_NAME) .orderBy("${Jobs.CREATE_TIME}, ${Jobs.ID} ASC") .limit(limit) .run() + .readToList { it.toJobSpec() } + } + + @Synchronized + fun getAllMatchingFilter(predicate: Predicate): List { + val output: MutableList = mutableListOf() + + readableDatabase + .select() + .from(Jobs.TABLE_NAME) + .run() .readToList { cursor -> - jobSpecFromCursor(cursor) + val jobSpec = cursor.toJobSpec() + if (predicate.test(jobSpec)) { + output += jobSpec + } } + + return output } @Synchronized @@ -215,9 +222,7 @@ class JobDatabase( .from(Jobs.TABLE_NAME) .where("${Jobs.JOB_SPEC_ID} = ?", id) .run() - .readToSingleObject { - jobSpecFromCursor(it) - } + .readToSingleObject { it.toJobSpec() } } @Synchronized @@ -296,26 +301,41 @@ class JobDatabase( .filterNot { it.isMemoryOnly } .forEach { job -> db.update(Jobs.TABLE_NAME) - .values( - Jobs.JOB_SPEC_ID to job.id, - Jobs.FACTORY_KEY to job.factoryKey, - Jobs.QUEUE_KEY to job.queueKey, - Jobs.CREATE_TIME to job.createTime, - Jobs.LAST_RUN_ATTEMPT_TIME to job.lastRunAttemptTime, - Jobs.NEXT_BACKOFF_INTERVAL to job.nextBackoffInterval, - Jobs.RUN_ATTEMPT to job.runAttempt, - Jobs.MAX_ATTEMPTS to job.maxAttempts, - Jobs.LIFESPAN to job.lifespan, - Jobs.SERIALIZED_DATA to job.serializedData, - Jobs.SERIALIZED_INPUT_DATA to job.serializedInputData, - Jobs.IS_RUNNING to if (job.isRunning) 1 else 0 - ) + .values(job.toContentValues()) .where("${Jobs.JOB_SPEC_ID} = ?", job.id) .run() } } } + @Synchronized + fun transformJobs(transformer: (JobSpec) -> JobSpec): List { + val transformed: MutableList = mutableListOf() + + writableDatabase.withinTransaction { db -> + readableDatabase + .select() + .from(Jobs.TABLE_NAME) + .run() + .forEach { cursor -> + val jobSpec = cursor.toJobSpec() + val updated = transformer(jobSpec) + if (updated != jobSpec) { + transformed += updated + } + } + + for (job in transformed) { + db.update(Jobs.TABLE_NAME) + .values(job.toContentValues()) + .where("${Jobs.JOB_SPEC_ID} = ?", job.id) + .run() + } + } + + return transformed + } + @Synchronized fun deleteJobs(jobIds: List) { writableDatabase.withinTransaction { db -> @@ -340,14 +360,30 @@ class JobDatabase( } @Synchronized - fun getAllConstraintSpecs(): List { + fun getConstraintSpecs(limit: Int): List { return readableDatabase .select() .from(Constraints.TABLE_NAME) + .limit(limit) .run() - .readToList { cursor -> - constraintSpecFromCursor(cursor) - } + .readToList { it.toConstraintSpec() } + } + + fun getConstraintSpecsForJobs(jobIds: Collection): List { + val output: MutableList = mutableListOf() + + for (query in SqlUtil.buildCollectionQuery(Constraints.JOB_SPEC_ID, jobIds)) { + readableDatabase + .select() + .from(Constraints.TABLE_NAME) + .where(query.where, query.whereArgs) + .run() + .forEach { + output += it.toConstraintSpec() + } + } + + return output } @Synchronized @@ -356,9 +392,7 @@ class JobDatabase( .select() .from(Dependencies.TABLE_NAME) .run() - .readToList { cursor -> - dependencySpecFromCursor(cursor) - } + .readToList { it.toDependencySpec() } } private fun insertJobSpec(db: SQLiteDatabase, job: JobSpec) { @@ -369,21 +403,7 @@ class JobDatabase( check(db.inTransaction()) db.insertInto(Jobs.TABLE_NAME) - .values( - Jobs.JOB_SPEC_ID to job.id, - Jobs.FACTORY_KEY to job.factoryKey, - Jobs.QUEUE_KEY to job.queueKey, - Jobs.CREATE_TIME to job.createTime, - Jobs.LAST_RUN_ATTEMPT_TIME to job.lastRunAttemptTime, - Jobs.NEXT_BACKOFF_INTERVAL to job.nextBackoffInterval, - Jobs.RUN_ATTEMPT to job.runAttempt, - Jobs.MAX_ATTEMPTS to job.maxAttempts, - Jobs.LIFESPAN to job.lifespan, - Jobs.SERIALIZED_DATA to job.serializedData, - Jobs.SERIALIZED_INPUT_DATA to job.serializedInputData, - Jobs.IS_RUNNING to if (job.isRunning) 1 else 0, - Jobs.PRIORITY to job.priority - ) + .values(job.toContentValues()) .run(SQLiteDatabase.CONFLICT_IGNORE) } @@ -417,37 +437,37 @@ class JobDatabase( } } - private fun jobSpecFromCursor(cursor: Cursor): JobSpec { + private fun Cursor.toJobSpec(): JobSpec { return JobSpec( - id = cursor.requireNonNullString(Jobs.JOB_SPEC_ID), - factoryKey = cursor.requireNonNullString(Jobs.FACTORY_KEY), - queueKey = cursor.requireString(Jobs.QUEUE_KEY), - createTime = cursor.requireLong(Jobs.CREATE_TIME), - lastRunAttemptTime = cursor.requireLong(Jobs.LAST_RUN_ATTEMPT_TIME), - nextBackoffInterval = cursor.requireLong(Jobs.NEXT_BACKOFF_INTERVAL), - runAttempt = cursor.requireInt(Jobs.RUN_ATTEMPT), - maxAttempts = cursor.requireInt(Jobs.MAX_ATTEMPTS), - lifespan = cursor.requireLong(Jobs.LIFESPAN), - serializedData = cursor.requireBlob(Jobs.SERIALIZED_DATA), - serializedInputData = cursor.requireBlob(Jobs.SERIALIZED_INPUT_DATA), - isRunning = cursor.requireBoolean(Jobs.IS_RUNNING), + id = this.requireNonNullString(Jobs.JOB_SPEC_ID), + factoryKey = this.requireNonNullString(Jobs.FACTORY_KEY), + queueKey = this.requireString(Jobs.QUEUE_KEY), + createTime = this.requireLong(Jobs.CREATE_TIME), + lastRunAttemptTime = this.requireLong(Jobs.LAST_RUN_ATTEMPT_TIME), + nextBackoffInterval = this.requireLong(Jobs.NEXT_BACKOFF_INTERVAL), + runAttempt = this.requireInt(Jobs.RUN_ATTEMPT), + maxAttempts = this.requireInt(Jobs.MAX_ATTEMPTS), + lifespan = this.requireLong(Jobs.LIFESPAN), + serializedData = this.requireBlob(Jobs.SERIALIZED_DATA), + serializedInputData = this.requireBlob(Jobs.SERIALIZED_INPUT_DATA), + isRunning = this.requireBoolean(Jobs.IS_RUNNING), isMemoryOnly = false, - priority = cursor.requireInt(Jobs.PRIORITY) + priority = this.requireInt(Jobs.PRIORITY) ) } - private fun constraintSpecFromCursor(cursor: Cursor): ConstraintSpec { + private fun Cursor.toConstraintSpec(): ConstraintSpec { return ConstraintSpec( - jobSpecId = cursor.requireNonNullString(Constraints.JOB_SPEC_ID), - factoryKey = cursor.requireNonNullString(Constraints.FACTORY_KEY), + jobSpecId = this.requireNonNullString(Constraints.JOB_SPEC_ID), + factoryKey = this.requireNonNullString(Constraints.FACTORY_KEY), isMemoryOnly = false ) } - private fun dependencySpecFromCursor(cursor: Cursor): DependencySpec { + private fun Cursor.toDependencySpec(): DependencySpec { return DependencySpec( - jobId = cursor.requireNonNullString(Dependencies.JOB_SPEC_ID), - dependsOnJobId = cursor.requireNonNullString(Dependencies.DEPENDS_ON_JOB_SPEC_ID), + jobId = this.requireNonNullString(Dependencies.JOB_SPEC_ID), + dependsOnJobId = this.requireNonNullString(Dependencies.DEPENDS_ON_JOB_SPEC_ID), isMemoryOnly = false ) } @@ -468,6 +488,24 @@ class JobDatabase( writableDatabase.update(Jobs.TABLE_NAME, contentValuesOf(Jobs.NEXT_BACKOFF_INTERVAL to 0), null, null) } + private fun JobSpec.toContentValues(): ContentValues { + return contentValuesOf( + Jobs.JOB_SPEC_ID to this.id, + Jobs.FACTORY_KEY to this.factoryKey, + Jobs.QUEUE_KEY to this.queueKey, + Jobs.CREATE_TIME to this.createTime, + Jobs.LAST_RUN_ATTEMPT_TIME to this.lastRunAttemptTime, + Jobs.NEXT_BACKOFF_INTERVAL to this.nextBackoffInterval, + Jobs.RUN_ATTEMPT to this.runAttempt, + Jobs.MAX_ATTEMPTS to this.maxAttempts, + Jobs.LIFESPAN to this.lifespan, + Jobs.SERIALIZED_DATA to this.serializedData, + Jobs.SERIALIZED_INPUT_DATA to this.serializedInputData, + Jobs.IS_RUNNING to if (this.isRunning) 1 else 0, + Jobs.PRIORITY to this.priority + ) + } + companion object { private val TAG = Log.tag(JobDatabase::class.java) private const val DATABASE_VERSION = 3 diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java index e0e2bb5e90..7bb5937ec4 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java @@ -231,26 +231,13 @@ class JobController { @WorkerThread synchronized void update(@NonNull JobUpdater updater) { - List allJobs = jobStorage.getAllJobSpecs(); - List updatedJobs = new LinkedList<>(); - - for (JobSpec job : allJobs) { - JobSpec updated = updater.update(job); - if (updated != job) { - updatedJobs.add(updated); - } - } - - jobStorage.updateJobs(updatedJobs); - + jobStorage.transformJobs(updater::update); notifyAll(); } @WorkerThread synchronized List findJobs(@NonNull Predicate predicate) { - return Stream.of(jobStorage.getAllJobSpecs()) - .filter(predicate::test) - .toList(); + return jobStorage.getAllMatchingFilter(predicate); } @WorkerThread @@ -360,9 +347,9 @@ class JobController { */ @WorkerThread synchronized @NonNull String getDebugInfo() { - List jobs = jobStorage.getAllJobSpecs(); - List constraints = jobStorage.getAllConstraintSpecs(); - List dependencies = jobStorage.getAllDependencySpecs(); + List jobs = jobStorage.debugGetJobSpecs(1000); + List constraints = jobStorage.debugGetConstraintSpecs(1000); + List dependencies = jobStorage.debugGetAllDependencySpecs(); StringBuilder info = new StringBuilder(); diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobMigration.kt b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobMigration.kt index 931e6da1d8..cc0a2d1e1d 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobMigration.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobMigration.kt @@ -33,6 +33,33 @@ abstract class JobMigration protected constructor(val endVersion: Int) { return copy(data = newData) } + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (javaClass != other?.javaClass) return false + + other as JobData + + if (factoryKey != other.factoryKey) return false + if (queueKey != other.queueKey) return false + if (maxAttempts != other.maxAttempts) return false + if (lifespan != other.lifespan) return false + if (data != null) { + if (other.data == null) return false + if (!data.contentEquals(other.data)) return false + } else if (other.data != null) return false + + return true + } + + override fun hashCode(): Int { + var result = factoryKey.hashCode() + result = 31 * result + (queueKey?.hashCode() ?: 0) + result = 31 * result + maxAttempts + result = 31 * result + lifespan.hashCode() + result = 31 * result + (data?.contentHashCode() ?: 0) + return result + } + companion object { @JvmField val FAILING_JOB_DATA = JobData("FailingJob", null, -1, -1, null) diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobMigrator.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobMigrator.java index 0996311ce3..60ef3da6bc 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobMigrator.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobMigrator.java @@ -11,7 +11,6 @@ import org.thoughtcrime.securesms.jobmanager.persistence.JobStorage; import java.util.HashMap; import java.util.List; -import java.util.ListIterator; import java.util.Map; @SuppressLint("UseSparseArrays") @@ -47,41 +46,36 @@ public class JobMigrator { * @return The version that has been migrated to. */ int migrate(@NonNull JobStorage jobStorage) { - List jobSpecs = jobStorage.getAllJobSpecs(); - for (int i = lastSeenVersion; i < currentVersion; i++) { Log.i(TAG, "Migrating from " + i + " to " + (i + 1)); - - ListIterator iter = jobSpecs.listIterator(); - JobMigration migration = migrations.get(i + 1); - + JobMigration migration = migrations.get(i + 1); assert migration != null; - while (iter.hasNext()) { - JobSpec jobSpec = iter.next(); - JobData originalJobData = new JobData(jobSpec.getFactoryKey(), jobSpec.getQueueKey(), jobSpec.getMaxAttempts(), jobSpec.getLifespan(), jobSpec.getSerializedData()); - JobData updatedJobData = migration.migrate(originalJobData); - JobSpec updatedJobSpec = new JobSpec(jobSpec.getId(), - updatedJobData.getFactoryKey(), - updatedJobData.getQueueKey(), - jobSpec.getCreateTime(), - jobSpec.getLastRunAttemptTime(), - jobSpec.getNextBackoffInterval(), - jobSpec.getRunAttempt(), - updatedJobData.getMaxAttempts(), - updatedJobData.getLifespan(), - updatedJobData.getData(), - jobSpec.getSerializedInputData(), - jobSpec.isRunning(), - jobSpec.isMemoryOnly(), - jobSpec.getPriority()); + jobStorage.transformJobs(jobSpec -> { + JobData originalJobData = new JobData(jobSpec.getFactoryKey(), jobSpec.getQueueKey(), jobSpec.getMaxAttempts(), jobSpec.getLifespan(), jobSpec.getSerializedData()); + JobData updatedJobData = migration.migrate(originalJobData); - iter.set(updatedJobSpec); - } + if (updatedJobData == originalJobData) { + return jobSpec; + } + + return new JobSpec(jobSpec.getId(), + updatedJobData.getFactoryKey(), + updatedJobData.getQueueKey(), + jobSpec.getCreateTime(), + jobSpec.getLastRunAttemptTime(), + jobSpec.getNextBackoffInterval(), + jobSpec.getRunAttempt(), + updatedJobData.getMaxAttempts(), + updatedJobData.getLifespan(), + updatedJobData.getData(), + jobSpec.getSerializedInputData(), + jobSpec.isRunning(), + jobSpec.isMemoryOnly(), + jobSpec.getPriority()); + }); } - jobStorage.updateJobs(jobSpecs); - return currentVersion; } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/persistence/JobStorage.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/persistence/JobStorage.java deleted file mode 100644 index 685e006d3c..0000000000 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/persistence/JobStorage.java +++ /dev/null @@ -1,68 +0,0 @@ -package org.thoughtcrime.securesms.jobmanager.persistence; - -import androidx.annotation.NonNull; -import androidx.annotation.Nullable; -import androidx.annotation.WorkerThread; - -import java.util.List; -import java.util.Set; - -public interface JobStorage { - - @WorkerThread - void init(); - - @WorkerThread - void insertJobs(@NonNull List fullSpecs); - - @WorkerThread - @Nullable JobSpec getJobSpec(@NonNull String id); - - @WorkerThread - @NonNull List getAllJobSpecs(); - - @WorkerThread - @NonNull List getPendingJobsWithNoDependenciesInCreatedOrder(long currentTime); - - @WorkerThread - @NonNull List getJobsInQueue(@NonNull String queue); - - @WorkerThread - int getJobCountForFactory(@NonNull String factoryKey); - - @WorkerThread - int getJobCountForFactoryAndQueue(@NonNull String factoryKey, @NonNull String queueKey); - - @WorkerThread - boolean areQueuesEmpty(@NonNull Set queueKeys); - - @WorkerThread - void markJobAsRunning(@NonNull String id, long currentTime); - - @WorkerThread - void updateJobAfterRetry(@NonNull String id, long currentTime, int runAttempt, long nextBackoffInterval, @Nullable byte[] serializedData); - - @WorkerThread - void updateAllJobsToBePending(); - - @WorkerThread - void updateJobs(@NonNull List jobSpecs); - - @WorkerThread - void deleteJob(@NonNull String id); - - @WorkerThread - void deleteJobs(@NonNull List ids); - - @WorkerThread - @NonNull List getConstraintSpecs(@NonNull String jobId); - - @WorkerThread - @NonNull List getAllConstraintSpecs(); - - @WorkerThread - @NonNull List getDependencySpecsThatDependOnJob(@NonNull String jobSpecId); - - @WorkerThread - @NonNull List getAllDependencySpecs(); -} diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/persistence/JobStorage.kt b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/persistence/JobStorage.kt new file mode 100644 index 0000000000..6529a2ca6a --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/persistence/JobStorage.kt @@ -0,0 +1,69 @@ +package org.thoughtcrime.securesms.jobmanager.persistence + +import androidx.annotation.WorkerThread +import java.util.function.Predicate + +interface JobStorage { + @WorkerThread + fun init() + + @WorkerThread + fun insertJobs(fullSpecs: List) + + @WorkerThread + fun getJobSpec(id: String): JobSpec? + + @WorkerThread + fun getAllMatchingFilter(predicate: Predicate): List + + @WorkerThread + fun getPendingJobsWithNoDependenciesInCreatedOrder(currentTime: Long): List + + @WorkerThread + fun getJobsInQueue(queue: String): List + + @WorkerThread + fun getJobCountForFactory(factoryKey: String): Int + + @WorkerThread + fun getJobCountForFactoryAndQueue(factoryKey: String, queueKey: String): Int + + @WorkerThread + fun areQueuesEmpty(queueKeys: Set): Boolean + + @WorkerThread + fun markJobAsRunning(id: String, currentTime: Long) + + @WorkerThread + fun updateJobAfterRetry(id: String, currentTime: Long, runAttempt: Int, nextBackoffInterval: Long, serializedData: ByteArray?) + + @WorkerThread + fun updateAllJobsToBePending() + + @WorkerThread + fun updateJobs(jobSpecs: List) + + @WorkerThread + fun transformJobs(transformer: (JobSpec) -> JobSpec) + + @WorkerThread + fun deleteJob(id: String) + + @WorkerThread + fun deleteJobs(ids: List) + + @WorkerThread + fun getConstraintSpecs(jobId: String): List + + @WorkerThread + fun getDependencySpecsThatDependOnJob(jobSpecId: String): List + + @WorkerThread + fun debugGetJobSpecs(limit: Int): List + + @WorkerThread + fun debugGetConstraintSpecs(limit: Int): List + + @WorkerThread + fun debugGetAllDependencySpecs(): List +} diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/FastJobStorage.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/FastJobStorage.kt index e3f8e86873..4b7dfaa5b3 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/FastJobStorage.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/FastJobStorage.kt @@ -10,6 +10,7 @@ import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec import org.thoughtcrime.securesms.jobmanager.persistence.JobStorage import org.thoughtcrime.securesms.util.LRUCache import java.util.TreeSet +import java.util.function.Predicate class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage { @@ -17,23 +18,39 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage { private const val JOB_CACHE_LIMIT = 1000 } + /** We keep a trimmed down version of every job in memory. */ + private val minimalJobs: MutableList = mutableListOf() + + /** + * We keep a set of job specs in memory to facilitate fast retrieval. This is important because the most common job storage pattern is + * [getPendingJobsWithNoDependenciesInCreatedOrder], which needs to return full specs. + */ private val jobSpecCache: LRUCache = LRUCache(JOB_CACHE_LIMIT) - private val jobs: MutableList = mutableListOf() + /** + * We keep a set of constraints in memory, seeded by the same jobs in the [jobSpecCache]. It doesn't need to necessarily stay in sync with that cache, though. + * The most important property to maintain is that if there's an entry in the map for a given jobId, we need to ensure we have _all_ of the constraints for + * that job. Important for [getConstraintSpecs]. + */ + private val constraintsByJobId: LRUCache> = LRUCache(JOB_CACHE_LIMIT) - // TODO [job] Rather than duplicate what is likely the same handful of constraints over and over, we should somehow re-use instances - private val constraintsByJobId: MutableMap> = mutableMapOf() - private val dependenciesByJobId: MutableMap> = mutableMapOf() + /** We keep every dependency in memory, since there aren't that many, and managing a limited subset would be very complicated. */ + private val dependenciesByJobId: MutableMap> = hashMapOf() + /** The list of jobs eligible to be returned from [getPendingJobsWithNoDependenciesInCreatedOrder], kept sorted in the appropriate order. */ private val eligibleJobs: TreeSet = TreeSet(EligibleJobComparator) + + /** All migration-related jobs, kept in the appropriate order. */ private val migrationJobs: TreeSet = TreeSet(compareBy { it.createTime }) + + /** We need a fast way to know what the "most eligible job" is for a given queue. This serves as a lookup table that speeds up the maintenance of [eligibleJobs]. */ private val mostEligibleJobForQueue: MutableMap = hashMapOf() @Synchronized override fun init() { - jobs += jobDatabase.getAllMinimalJobSpecs() + minimalJobs += jobDatabase.getAllMinimalJobSpecs() - for (job in jobs) { + for (job in minimalJobs) { if (job.queueKey == Job.Parameters.MIGRATION_QUEUE_KEY) { migrationJobs += job } else { @@ -41,11 +58,11 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage { } } - jobDatabase.getOldestJobSpecs(JOB_CACHE_LIMIT).forEach { + jobDatabase.getJobSpecs(JOB_CACHE_LIMIT).forEach { jobSpecCache[it.id] = it } - for (constraintSpec in jobDatabase.getAllConstraintSpecs()) { + for (constraintSpec in jobDatabase.getConstraintSpecsForJobs(jobSpecCache.keys)) { val jobConstraints: MutableList = constraintsByJobId.getOrPut(constraintSpec.jobSpecId) { mutableListOf() } jobConstraints += constraintSpec } @@ -66,7 +83,7 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage { for (fullSpec in fullSpecs) { val minimalJobSpec = fullSpec.jobSpec.toMinimalJobSpec() - jobs += minimalJobSpec + minimalJobs += minimalJobSpec jobSpecCache[fullSpec.jobSpec.id] = fullSpec.jobSpec if (fullSpec.jobSpec.queueKey == Job.Parameters.MIGRATION_QUEUE_KEY) { @@ -82,13 +99,12 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage { @Synchronized override fun getJobSpec(id: String): JobSpec? { - return jobs.firstOrNull { it.id == id }?.toJobSpec() + return minimalJobs.firstOrNull { it.id == id }?.toJobSpec() } @Synchronized - override fun getAllJobSpecs(): List { - // TODO [job] this will have to change - return jobDatabase.getAllJobSpecs() + override fun getAllMatchingFilter(predicate: Predicate): List { + return jobDatabase.getAllMatchingFilter(predicate) } @Synchronized @@ -115,28 +131,28 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage { @Synchronized override fun getJobsInQueue(queue: String): List { - return jobs + return minimalJobs .filter { it.queueKey == queue } .map { it.toJobSpec() } } @Synchronized override fun getJobCountForFactory(factoryKey: String): Int { - return jobs + return minimalJobs .filter { it.factoryKey == factoryKey } .size } @Synchronized override fun getJobCountForFactoryAndQueue(factoryKey: String, queueKey: String): Int { - return jobs + return minimalJobs .filter { it.factoryKey == factoryKey && it.queueKey == queueKey } .size } @Synchronized override fun areQueuesEmpty(queueKeys: Set): Boolean { - return jobs.none { it.queueKey != null && queueKeys.contains(it.queueKey) } + return minimalJobs.none { it.queueKey != null && queueKeys.contains(it.queueKey) } } @Synchronized @@ -229,13 +245,32 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage { } @Synchronized - override fun deleteJob(jobId: String) { - deleteJobs(listOf(jobId)) + override fun transformJobs(transformer: (JobSpec) -> JobSpec) { + val updated = jobDatabase.transformJobs(transformer) + for (update in updated) { + jobSpecCache[update.id] = update + } + + val iterator = minimalJobs.listIterator() + while (iterator.hasNext()) { + val current = iterator.next() + val updatedJob = updated.firstOrNull { it.id == current.id } + + if (updatedJob != null) { + iterator.set(updatedJob.toMinimalJobSpec()) + replaceJobInEligibleList(current, updatedJob.toMinimalJobSpec()) + } + } } @Synchronized - override fun deleteJobs(jobIds: List) { - val jobsToDelete: Set = jobIds + override fun deleteJob(id: String) { + deleteJobs(listOf(id)) + } + + @Synchronized + override fun deleteJobs(ids: List) { + val jobsToDelete: Set = ids .mapNotNull { getJobSpec(it) } .toSet() @@ -251,13 +286,13 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage { jobDatabase.deleteJobs(durableJobIdsToDelete) } - val deleteIds: Set = jobIds.toSet() - jobs.removeIf { deleteIds.contains(it.id) } + val deleteIds: Set = ids.toSet() + minimalJobs.removeIf { deleteIds.contains(it.id) } jobSpecCache.keys.removeAll(deleteIds) eligibleJobs.removeAll(minimalJobsToDelete) migrationJobs.removeAll(minimalJobsToDelete) - for (jobId in jobIds) { + for (jobId in ids) { constraintsByJobId.remove(jobId) dependenciesByJobId.remove(jobId) @@ -275,12 +310,9 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage { @Synchronized override fun getConstraintSpecs(jobId: String): List { - return constraintsByJobId.getOrElse(jobId) { listOf() } - } - - @Synchronized - override fun getAllConstraintSpecs(): List { - return constraintsByJobId.values.flatten() + return constraintsByJobId.getOrPut(jobId) { + jobDatabase.getConstraintSpecsForJobs(listOf(jobId)).toMutableList() + } } @Synchronized @@ -301,12 +333,22 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage { } @Synchronized - override fun getAllDependencySpecs(): List { + override fun debugGetJobSpecs(limit: Int): List { + return jobDatabase.getJobSpecs(limit) + } + + @Synchronized + override fun debugGetConstraintSpecs(limit: Int): List { + return jobDatabase.getConstraintSpecs(limit) + } + + @Synchronized + override fun debugGetAllDependencySpecs(): List { return dependenciesByJobId.values.flatten() } private fun updateCachedJobSpecs(filter: (MinimalJobSpec) -> Boolean, transformer: (MinimalJobSpec) -> MinimalJobSpec, singleUpdate: Boolean = false) { - val iterator = jobs.listIterator() + val iterator = minimalJobs.listIterator() while (iterator.hasNext()) { val current = iterator.next() @@ -329,10 +371,10 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage { isMemoryOnly = updated.isMemoryOnly ) jobSpecCache[updatedJobSpec.id] = updatedJobSpec - } - if (singleUpdate) { - return + if (singleUpdate) { + return + } } } } diff --git a/app/src/test/java/org/thoughtcrime/securesms/jobmanager/JobMigratorTest.java b/app/src/test/java/org/thoughtcrime/securesms/jobmanager/JobMigratorTest.java index ffd3c47797..f8a4a61ae1 100644 --- a/app/src/test/java/org/thoughtcrime/securesms/jobmanager/JobMigratorTest.java +++ b/app/src/test/java/org/thoughtcrime/securesms/jobmanager/JobMigratorTest.java @@ -12,8 +12,12 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import kotlin.jvm.functions.Function1; + import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -88,7 +92,14 @@ public class JobMigratorTest { private static JobStorage simpleJobStorage() { JobStorage jobStorage = mock(JobStorage.class); - when(jobStorage.getAllJobSpecs()).thenReturn(new ArrayList<>(Collections.singletonList(new JobSpec("1", "f1", null, 1, 1, 1, 1, 1, 1, null, null, false, false, 0)))); + JobSpec job = new JobSpec("1", "f1", null, 1, 1, 1, 1, 1, 1, null, null, false, false, 0); + + when(jobStorage.debugGetJobSpecs(anyInt())).thenReturn(new ArrayList<>(Collections.singletonList(job))); + doAnswer(invocation -> { + Function1 transformer = invocation.getArgument(0); + return transformer.invoke(job); + }).when(jobStorage).transformJobs(any()); + return jobStorage; } diff --git a/app/src/test/java/org/thoughtcrime/securesms/jobs/FastJobStorageTest.kt b/app/src/test/java/org/thoughtcrime/securesms/jobs/FastJobStorageTest.kt index bc9a904101..b93f3d64b4 100644 --- a/app/src/test/java/org/thoughtcrime/securesms/jobs/FastJobStorageTest.kt +++ b/app/src/test/java/org/thoughtcrime/securesms/jobs/FastJobStorageTest.kt @@ -20,9 +20,9 @@ class FastJobStorageTest { val subject = FastJobStorage(mockDatabase(DataSet1.FULL_SPECS)) subject.init() - DataSet1.assertJobsMatch(subject.allJobSpecs) - DataSet1.assertConstraintsMatch(subject.allConstraintSpecs) - DataSet1.assertDependenciesMatch(subject.allDependencySpecs) + DataSet1.assertJobsMatch(subject.debugGetJobSpecs(1000)) + DataSet1.assertConstraintsMatch(subject.debugGetConstraintSpecs(1000)) + DataSet1.assertDependenciesMatch(subject.debugGetAllDependencySpecs()) } @Test @@ -30,9 +30,9 @@ class FastJobStorageTest { val subject = FastJobStorage(mockDatabase(DataSetCircularDependency.FULL_SPECS)) subject.init() - DataSetCircularDependency.assertJobsMatch(subject.allJobSpecs) - DataSetCircularDependency.assertConstraintsMatch(subject.allConstraintSpecs) - DataSetCircularDependency.assertDependenciesMatch(subject.allDependencySpecs) + DataSetCircularDependency.assertJobsMatch(subject.debugGetJobSpecs(1000)) + DataSetCircularDependency.assertConstraintsMatch(subject.debugGetConstraintSpecs(1000)) + DataSetCircularDependency.assertDependenciesMatch(subject.debugGetAllDependencySpecs()) } @Test @@ -59,9 +59,9 @@ class FastJobStorageTest { fun `insertJobs - data can be found`() { val subject = FastJobStorage(mockDatabase()) subject.insertJobs(DataSet1.FULL_SPECS) - DataSet1.assertJobsMatch(subject.allJobSpecs) - DataSet1.assertConstraintsMatch(subject.allConstraintSpecs) - DataSet1.assertDependenciesMatch(subject.allDependencySpecs) + DataSet1.assertJobsMatch(subject.debugGetJobSpecs(1000)) + DataSet1.assertConstraintsMatch(subject.debugGetConstraintSpecs(1000)) + DataSet1.assertDependenciesMatch(subject.debugGetAllDependencySpecs()) } @Test @@ -164,6 +164,71 @@ class FastJobStorageTest { subject.getJobSpec("3") assertIs fullSpec3.jobSpec } + @Test + fun `transformJobs - writes to database`() { + val database = mockDatabase(DataSet1.FULL_SPECS) + + val subject = FastJobStorage(database) + subject.init() + val transformer: (JobSpec) -> JobSpec = { it } + subject.transformJobs(transformer) + + verify { database.transformJobs(transformer) } + } + + @Test + fun `transformJobs - updates all fields`() { + val fullSpec1 = FullSpec(jobSpec(id = "1", factoryKey = "f1"), emptyList(), emptyList()) + val fullSpec2 = FullSpec(jobSpec(id = "2", factoryKey = "f2"), emptyList(), emptyList()) + val fullSpec3 = FullSpec(jobSpec(id = "3", factoryKey = "f3"), emptyList(), emptyList()) + + val update1 = jobSpec( + id = "1", + factoryKey = "g1", + queueKey = "q1", + createTime = 2, + lastRunAttemptTime = 2, + nextBackoffInterval = 2, + runAttempt = 2, + maxAttempts = 2, + lifespan = 2, + serializedData = "abc".toByteArray(), + serializedInputData = null, + isRunning = true, + isMemoryOnly = false + ) + val update2 = jobSpec( + id = "2", + factoryKey = "g2", + queueKey = "q2", + createTime = 3, + lastRunAttemptTime = 3, + nextBackoffInterval = 3, + runAttempt = 3, + maxAttempts = 3, + lifespan = 3, + serializedData = "def".toByteArray(), + serializedInputData = "ghi".toByteArray(), + isRunning = true, + isMemoryOnly = false + ) + + val subject = FastJobStorage(mockDatabase(listOf(fullSpec1, fullSpec2, fullSpec3))) + subject.init() + + subject.transformJobs { + when (it.id) { + "1" -> update1 + "2" -> update2 + else -> it + } + } + + subject.getJobSpec("1") assertIs update1 + subject.getJobSpec("2") assertIs update2 + subject.getJobSpec("3") assertIs fullSpec3.jobSpec + } + @Test fun `markJobAsRunning - writes to database`() { val database = mockDatabase(DataSet1.FULL_SPECS) @@ -614,9 +679,9 @@ class FastJobStorageTest { subject.deleteJobs(listOf("id1")) - val jobs = subject.allJobSpecs - val constraints = subject.allConstraintSpecs - val dependencies = subject.allDependencySpecs + val jobs = subject.debugGetJobSpecs(1000) + val constraints = subject.debugGetConstraintSpecs(1000) + val dependencies = subject.debugGetAllDependencySpecs() jobs.size assertIs 2 jobs[0] assertIs DataSet1.JOB_2 @@ -727,11 +792,11 @@ class FastJobStorageTest { val dependencies = fullSpecs.map { it.dependencySpecs }.flatten().toMutableList() val mock = mockk(relaxed = true) - every { mock.getAllJobSpecs() } returns jobs + every { mock.getJobSpecs(any()) } returns jobs every { mock.getAllMinimalJobSpecs() } returns jobs.map { it.toMinimalJobSpec() } - every { mock.getOldestJobSpecs(any()) } answers { jobs.sortedBy { it.createTime }.take(firstArg()) } - every { mock.getAllConstraintSpecs() } returns constraints + every { mock.getConstraintSpecs(any()) } returns constraints every { mock.getAllDependencySpecs() } returns dependencies + every { mock.getConstraintSpecsForJobs(any()) } returns constraints every { mock.getJobSpec(any()) } answers { jobs.first { it.id == firstArg() } } every { mock.insertJobs(any()) } answers { val inserts: List = firstArg() @@ -754,6 +819,20 @@ class FastJobStorageTest { jobs += update } } + every { mock.transformJobs(any()) } answers { + val transformer: (JobSpec) -> JobSpec = firstArg() + val iterator = jobs.listIterator() + val out = mutableListOf() + while (iterator.hasNext()) { + val current = iterator.next() + val updated = transformer(current) + iterator.set(transformer(current)) + if (current != updated) { + out += updated + } + } + out + } every { mock.updateAllJobsToBePending() } answers { val iterator = jobs.listIterator() while (iterator.hasNext()) { diff --git a/core-util/src/main/java/org/signal/core/util/SQLiteDatabaseExtensions.kt b/core-util/src/main/java/org/signal/core/util/SQLiteDatabaseExtensions.kt index 78b61396f4..bc28eb65c6 100644 --- a/core-util/src/main/java/org/signal/core/util/SQLiteDatabaseExtensions.kt +++ b/core-util/src/main/java/org/signal/core/util/SQLiteDatabaseExtensions.kt @@ -206,6 +206,10 @@ class SelectBuilderPart2( return SelectBuilderPart4a(db, columns, tableName, "", arrayOf(), orderBy) } + fun limit(limit: Int): SelectBuilderPart4b { + return SelectBuilderPart4b(db, columns, tableName, "", arrayOf(), limit.toString()) + } + fun run(): Cursor { return db.query( SupportSQLiteQueryBuilder