Remove cases where all jobs were expected to be in memory.

This commit is contained in:
Greyson Parrelli
2024-07-17 11:58:09 -04:00
committed by Nicholas Tinsley
parent 973dc72cfa
commit 86cf8200b5
10 changed files with 422 additions and 239 deletions

View File

@@ -8,8 +8,10 @@ import androidx.core.content.contentValuesOf
import net.zetetic.database.sqlcipher.SQLiteDatabase
import net.zetetic.database.sqlcipher.SQLiteOpenHelper
import org.signal.core.util.CursorUtil
import org.signal.core.util.SqlUtil
import org.signal.core.util.concurrent.SignalExecutors
import org.signal.core.util.delete
import org.signal.core.util.forEach
import org.signal.core.util.insertInto
import org.signal.core.util.logging.Log
import org.signal.core.util.readToList
@@ -31,6 +33,7 @@ import org.thoughtcrime.securesms.jobmanager.persistence.DependencySpec
import org.thoughtcrime.securesms.jobmanager.persistence.FullSpec
import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec
import org.thoughtcrime.securesms.jobs.MinimalJobSpec
import java.util.function.Predicate
class JobDatabase(
application: Application,
@@ -184,28 +187,32 @@ class JobDatabase(
}
@Synchronized
fun getAllJobSpecs(): List<JobSpec> {
return readableDatabase
.select()
.from(Jobs.TABLE_NAME)
.orderBy("${Jobs.CREATE_TIME}, ${Jobs.ID} ASC")
.run()
.readToList { cursor ->
jobSpecFromCursor(cursor)
}
}
@Synchronized
fun getOldestJobSpecs(limit: Int): List<JobSpec> {
fun getJobSpecs(limit: Int): List<JobSpec> {
return readableDatabase
.select()
.from(Jobs.TABLE_NAME)
.orderBy("${Jobs.CREATE_TIME}, ${Jobs.ID} ASC")
.limit(limit)
.run()
.readToList { it.toJobSpec() }
}
@Synchronized
fun getAllMatchingFilter(predicate: Predicate<JobSpec>): List<JobSpec> {
val output: MutableList<JobSpec> = mutableListOf()
readableDatabase
.select()
.from(Jobs.TABLE_NAME)
.run()
.readToList { cursor ->
jobSpecFromCursor(cursor)
val jobSpec = cursor.toJobSpec()
if (predicate.test(jobSpec)) {
output += jobSpec
}
}
return output
}
@Synchronized
@@ -215,9 +222,7 @@ class JobDatabase(
.from(Jobs.TABLE_NAME)
.where("${Jobs.JOB_SPEC_ID} = ?", id)
.run()
.readToSingleObject {
jobSpecFromCursor(it)
}
.readToSingleObject { it.toJobSpec() }
}
@Synchronized
@@ -296,26 +301,41 @@ class JobDatabase(
.filterNot { it.isMemoryOnly }
.forEach { job ->
db.update(Jobs.TABLE_NAME)
.values(
Jobs.JOB_SPEC_ID to job.id,
Jobs.FACTORY_KEY to job.factoryKey,
Jobs.QUEUE_KEY to job.queueKey,
Jobs.CREATE_TIME to job.createTime,
Jobs.LAST_RUN_ATTEMPT_TIME to job.lastRunAttemptTime,
Jobs.NEXT_BACKOFF_INTERVAL to job.nextBackoffInterval,
Jobs.RUN_ATTEMPT to job.runAttempt,
Jobs.MAX_ATTEMPTS to job.maxAttempts,
Jobs.LIFESPAN to job.lifespan,
Jobs.SERIALIZED_DATA to job.serializedData,
Jobs.SERIALIZED_INPUT_DATA to job.serializedInputData,
Jobs.IS_RUNNING to if (job.isRunning) 1 else 0
)
.values(job.toContentValues())
.where("${Jobs.JOB_SPEC_ID} = ?", job.id)
.run()
}
}
}
@Synchronized
fun transformJobs(transformer: (JobSpec) -> JobSpec): List<JobSpec> {
val transformed: MutableList<JobSpec> = mutableListOf()
writableDatabase.withinTransaction { db ->
readableDatabase
.select()
.from(Jobs.TABLE_NAME)
.run()
.forEach { cursor ->
val jobSpec = cursor.toJobSpec()
val updated = transformer(jobSpec)
if (updated != jobSpec) {
transformed += updated
}
}
for (job in transformed) {
db.update(Jobs.TABLE_NAME)
.values(job.toContentValues())
.where("${Jobs.JOB_SPEC_ID} = ?", job.id)
.run()
}
}
return transformed
}
@Synchronized
fun deleteJobs(jobIds: List<String>) {
writableDatabase.withinTransaction { db ->
@@ -340,14 +360,30 @@ class JobDatabase(
}
@Synchronized
fun getAllConstraintSpecs(): List<ConstraintSpec> {
fun getConstraintSpecs(limit: Int): List<ConstraintSpec> {
return readableDatabase
.select()
.from(Constraints.TABLE_NAME)
.limit(limit)
.run()
.readToList { cursor ->
constraintSpecFromCursor(cursor)
}
.readToList { it.toConstraintSpec() }
}
fun getConstraintSpecsForJobs(jobIds: Collection<String>): List<ConstraintSpec> {
val output: MutableList<ConstraintSpec> = mutableListOf()
for (query in SqlUtil.buildCollectionQuery(Constraints.JOB_SPEC_ID, jobIds)) {
readableDatabase
.select()
.from(Constraints.TABLE_NAME)
.where(query.where, query.whereArgs)
.run()
.forEach {
output += it.toConstraintSpec()
}
}
return output
}
@Synchronized
@@ -356,9 +392,7 @@ class JobDatabase(
.select()
.from(Dependencies.TABLE_NAME)
.run()
.readToList { cursor ->
dependencySpecFromCursor(cursor)
}
.readToList { it.toDependencySpec() }
}
private fun insertJobSpec(db: SQLiteDatabase, job: JobSpec) {
@@ -369,21 +403,7 @@ class JobDatabase(
check(db.inTransaction())
db.insertInto(Jobs.TABLE_NAME)
.values(
Jobs.JOB_SPEC_ID to job.id,
Jobs.FACTORY_KEY to job.factoryKey,
Jobs.QUEUE_KEY to job.queueKey,
Jobs.CREATE_TIME to job.createTime,
Jobs.LAST_RUN_ATTEMPT_TIME to job.lastRunAttemptTime,
Jobs.NEXT_BACKOFF_INTERVAL to job.nextBackoffInterval,
Jobs.RUN_ATTEMPT to job.runAttempt,
Jobs.MAX_ATTEMPTS to job.maxAttempts,
Jobs.LIFESPAN to job.lifespan,
Jobs.SERIALIZED_DATA to job.serializedData,
Jobs.SERIALIZED_INPUT_DATA to job.serializedInputData,
Jobs.IS_RUNNING to if (job.isRunning) 1 else 0,
Jobs.PRIORITY to job.priority
)
.values(job.toContentValues())
.run(SQLiteDatabase.CONFLICT_IGNORE)
}
@@ -417,37 +437,37 @@ class JobDatabase(
}
}
private fun jobSpecFromCursor(cursor: Cursor): JobSpec {
private fun Cursor.toJobSpec(): JobSpec {
return JobSpec(
id = cursor.requireNonNullString(Jobs.JOB_SPEC_ID),
factoryKey = cursor.requireNonNullString(Jobs.FACTORY_KEY),
queueKey = cursor.requireString(Jobs.QUEUE_KEY),
createTime = cursor.requireLong(Jobs.CREATE_TIME),
lastRunAttemptTime = cursor.requireLong(Jobs.LAST_RUN_ATTEMPT_TIME),
nextBackoffInterval = cursor.requireLong(Jobs.NEXT_BACKOFF_INTERVAL),
runAttempt = cursor.requireInt(Jobs.RUN_ATTEMPT),
maxAttempts = cursor.requireInt(Jobs.MAX_ATTEMPTS),
lifespan = cursor.requireLong(Jobs.LIFESPAN),
serializedData = cursor.requireBlob(Jobs.SERIALIZED_DATA),
serializedInputData = cursor.requireBlob(Jobs.SERIALIZED_INPUT_DATA),
isRunning = cursor.requireBoolean(Jobs.IS_RUNNING),
id = this.requireNonNullString(Jobs.JOB_SPEC_ID),
factoryKey = this.requireNonNullString(Jobs.FACTORY_KEY),
queueKey = this.requireString(Jobs.QUEUE_KEY),
createTime = this.requireLong(Jobs.CREATE_TIME),
lastRunAttemptTime = this.requireLong(Jobs.LAST_RUN_ATTEMPT_TIME),
nextBackoffInterval = this.requireLong(Jobs.NEXT_BACKOFF_INTERVAL),
runAttempt = this.requireInt(Jobs.RUN_ATTEMPT),
maxAttempts = this.requireInt(Jobs.MAX_ATTEMPTS),
lifespan = this.requireLong(Jobs.LIFESPAN),
serializedData = this.requireBlob(Jobs.SERIALIZED_DATA),
serializedInputData = this.requireBlob(Jobs.SERIALIZED_INPUT_DATA),
isRunning = this.requireBoolean(Jobs.IS_RUNNING),
isMemoryOnly = false,
priority = cursor.requireInt(Jobs.PRIORITY)
priority = this.requireInt(Jobs.PRIORITY)
)
}
private fun constraintSpecFromCursor(cursor: Cursor): ConstraintSpec {
private fun Cursor.toConstraintSpec(): ConstraintSpec {
return ConstraintSpec(
jobSpecId = cursor.requireNonNullString(Constraints.JOB_SPEC_ID),
factoryKey = cursor.requireNonNullString(Constraints.FACTORY_KEY),
jobSpecId = this.requireNonNullString(Constraints.JOB_SPEC_ID),
factoryKey = this.requireNonNullString(Constraints.FACTORY_KEY),
isMemoryOnly = false
)
}
private fun dependencySpecFromCursor(cursor: Cursor): DependencySpec {
private fun Cursor.toDependencySpec(): DependencySpec {
return DependencySpec(
jobId = cursor.requireNonNullString(Dependencies.JOB_SPEC_ID),
dependsOnJobId = cursor.requireNonNullString(Dependencies.DEPENDS_ON_JOB_SPEC_ID),
jobId = this.requireNonNullString(Dependencies.JOB_SPEC_ID),
dependsOnJobId = this.requireNonNullString(Dependencies.DEPENDS_ON_JOB_SPEC_ID),
isMemoryOnly = false
)
}
@@ -468,6 +488,24 @@ class JobDatabase(
writableDatabase.update(Jobs.TABLE_NAME, contentValuesOf(Jobs.NEXT_BACKOFF_INTERVAL to 0), null, null)
}
private fun JobSpec.toContentValues(): ContentValues {
return contentValuesOf(
Jobs.JOB_SPEC_ID to this.id,
Jobs.FACTORY_KEY to this.factoryKey,
Jobs.QUEUE_KEY to this.queueKey,
Jobs.CREATE_TIME to this.createTime,
Jobs.LAST_RUN_ATTEMPT_TIME to this.lastRunAttemptTime,
Jobs.NEXT_BACKOFF_INTERVAL to this.nextBackoffInterval,
Jobs.RUN_ATTEMPT to this.runAttempt,
Jobs.MAX_ATTEMPTS to this.maxAttempts,
Jobs.LIFESPAN to this.lifespan,
Jobs.SERIALIZED_DATA to this.serializedData,
Jobs.SERIALIZED_INPUT_DATA to this.serializedInputData,
Jobs.IS_RUNNING to if (this.isRunning) 1 else 0,
Jobs.PRIORITY to this.priority
)
}
companion object {
private val TAG = Log.tag(JobDatabase::class.java)
private const val DATABASE_VERSION = 3

View File

@@ -231,26 +231,13 @@ class JobController {
@WorkerThread
synchronized void update(@NonNull JobUpdater updater) {
List<JobSpec> allJobs = jobStorage.getAllJobSpecs();
List<JobSpec> updatedJobs = new LinkedList<>();
for (JobSpec job : allJobs) {
JobSpec updated = updater.update(job);
if (updated != job) {
updatedJobs.add(updated);
}
}
jobStorage.updateJobs(updatedJobs);
jobStorage.transformJobs(updater::update);
notifyAll();
}
@WorkerThread
synchronized List<JobSpec> findJobs(@NonNull Predicate<JobSpec> predicate) {
return Stream.of(jobStorage.getAllJobSpecs())
.filter(predicate::test)
.toList();
return jobStorage.getAllMatchingFilter(predicate);
}
@WorkerThread
@@ -360,9 +347,9 @@ class JobController {
*/
@WorkerThread
synchronized @NonNull String getDebugInfo() {
List<JobSpec> jobs = jobStorage.getAllJobSpecs();
List<ConstraintSpec> constraints = jobStorage.getAllConstraintSpecs();
List<DependencySpec> dependencies = jobStorage.getAllDependencySpecs();
List<JobSpec> jobs = jobStorage.debugGetJobSpecs(1000);
List<ConstraintSpec> constraints = jobStorage.debugGetConstraintSpecs(1000);
List<DependencySpec> dependencies = jobStorage.debugGetAllDependencySpecs();
StringBuilder info = new StringBuilder();

View File

@@ -33,6 +33,33 @@ abstract class JobMigration protected constructor(val endVersion: Int) {
return copy(data = newData)
}
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false
other as JobData
if (factoryKey != other.factoryKey) return false
if (queueKey != other.queueKey) return false
if (maxAttempts != other.maxAttempts) return false
if (lifespan != other.lifespan) return false
if (data != null) {
if (other.data == null) return false
if (!data.contentEquals(other.data)) return false
} else if (other.data != null) return false
return true
}
override fun hashCode(): Int {
var result = factoryKey.hashCode()
result = 31 * result + (queueKey?.hashCode() ?: 0)
result = 31 * result + maxAttempts
result = 31 * result + lifespan.hashCode()
result = 31 * result + (data?.contentHashCode() ?: 0)
return result
}
companion object {
@JvmField
val FAILING_JOB_DATA = JobData("FailingJob", null, -1, -1, null)

View File

@@ -11,7 +11,6 @@ import org.thoughtcrime.securesms.jobmanager.persistence.JobStorage;
import java.util.HashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
@SuppressLint("UseSparseArrays")
@@ -47,41 +46,36 @@ public class JobMigrator {
* @return The version that has been migrated to.
*/
int migrate(@NonNull JobStorage jobStorage) {
List<JobSpec> jobSpecs = jobStorage.getAllJobSpecs();
for (int i = lastSeenVersion; i < currentVersion; i++) {
Log.i(TAG, "Migrating from " + i + " to " + (i + 1));
ListIterator<JobSpec> iter = jobSpecs.listIterator();
JobMigration migration = migrations.get(i + 1);
JobMigration migration = migrations.get(i + 1);
assert migration != null;
while (iter.hasNext()) {
JobSpec jobSpec = iter.next();
JobData originalJobData = new JobData(jobSpec.getFactoryKey(), jobSpec.getQueueKey(), jobSpec.getMaxAttempts(), jobSpec.getLifespan(), jobSpec.getSerializedData());
JobData updatedJobData = migration.migrate(originalJobData);
JobSpec updatedJobSpec = new JobSpec(jobSpec.getId(),
updatedJobData.getFactoryKey(),
updatedJobData.getQueueKey(),
jobSpec.getCreateTime(),
jobSpec.getLastRunAttemptTime(),
jobSpec.getNextBackoffInterval(),
jobSpec.getRunAttempt(),
updatedJobData.getMaxAttempts(),
updatedJobData.getLifespan(),
updatedJobData.getData(),
jobSpec.getSerializedInputData(),
jobSpec.isRunning(),
jobSpec.isMemoryOnly(),
jobSpec.getPriority());
jobStorage.transformJobs(jobSpec -> {
JobData originalJobData = new JobData(jobSpec.getFactoryKey(), jobSpec.getQueueKey(), jobSpec.getMaxAttempts(), jobSpec.getLifespan(), jobSpec.getSerializedData());
JobData updatedJobData = migration.migrate(originalJobData);
iter.set(updatedJobSpec);
}
if (updatedJobData == originalJobData) {
return jobSpec;
}
return new JobSpec(jobSpec.getId(),
updatedJobData.getFactoryKey(),
updatedJobData.getQueueKey(),
jobSpec.getCreateTime(),
jobSpec.getLastRunAttemptTime(),
jobSpec.getNextBackoffInterval(),
jobSpec.getRunAttempt(),
updatedJobData.getMaxAttempts(),
updatedJobData.getLifespan(),
updatedJobData.getData(),
jobSpec.getSerializedInputData(),
jobSpec.isRunning(),
jobSpec.isMemoryOnly(),
jobSpec.getPriority());
});
}
jobStorage.updateJobs(jobSpecs);
return currentVersion;
}
}

View File

@@ -1,68 +0,0 @@
package org.thoughtcrime.securesms.jobmanager.persistence;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import androidx.annotation.WorkerThread;
import java.util.List;
import java.util.Set;
public interface JobStorage {
@WorkerThread
void init();
@WorkerThread
void insertJobs(@NonNull List<FullSpec> fullSpecs);
@WorkerThread
@Nullable JobSpec getJobSpec(@NonNull String id);
@WorkerThread
@NonNull List<JobSpec> getAllJobSpecs();
@WorkerThread
@NonNull List<JobSpec> getPendingJobsWithNoDependenciesInCreatedOrder(long currentTime);
@WorkerThread
@NonNull List<JobSpec> getJobsInQueue(@NonNull String queue);
@WorkerThread
int getJobCountForFactory(@NonNull String factoryKey);
@WorkerThread
int getJobCountForFactoryAndQueue(@NonNull String factoryKey, @NonNull String queueKey);
@WorkerThread
boolean areQueuesEmpty(@NonNull Set<String> queueKeys);
@WorkerThread
void markJobAsRunning(@NonNull String id, long currentTime);
@WorkerThread
void updateJobAfterRetry(@NonNull String id, long currentTime, int runAttempt, long nextBackoffInterval, @Nullable byte[] serializedData);
@WorkerThread
void updateAllJobsToBePending();
@WorkerThread
void updateJobs(@NonNull List<JobSpec> jobSpecs);
@WorkerThread
void deleteJob(@NonNull String id);
@WorkerThread
void deleteJobs(@NonNull List<String> ids);
@WorkerThread
@NonNull List<ConstraintSpec> getConstraintSpecs(@NonNull String jobId);
@WorkerThread
@NonNull List<ConstraintSpec> getAllConstraintSpecs();
@WorkerThread
@NonNull List<DependencySpec> getDependencySpecsThatDependOnJob(@NonNull String jobSpecId);
@WorkerThread
@NonNull List<DependencySpec> getAllDependencySpecs();
}

View File

@@ -0,0 +1,69 @@
package org.thoughtcrime.securesms.jobmanager.persistence
import androidx.annotation.WorkerThread
import java.util.function.Predicate
interface JobStorage {
@WorkerThread
fun init()
@WorkerThread
fun insertJobs(fullSpecs: List<FullSpec>)
@WorkerThread
fun getJobSpec(id: String): JobSpec?
@WorkerThread
fun getAllMatchingFilter(predicate: Predicate<JobSpec>): List<JobSpec>
@WorkerThread
fun getPendingJobsWithNoDependenciesInCreatedOrder(currentTime: Long): List<JobSpec>
@WorkerThread
fun getJobsInQueue(queue: String): List<JobSpec>
@WorkerThread
fun getJobCountForFactory(factoryKey: String): Int
@WorkerThread
fun getJobCountForFactoryAndQueue(factoryKey: String, queueKey: String): Int
@WorkerThread
fun areQueuesEmpty(queueKeys: Set<String>): Boolean
@WorkerThread
fun markJobAsRunning(id: String, currentTime: Long)
@WorkerThread
fun updateJobAfterRetry(id: String, currentTime: Long, runAttempt: Int, nextBackoffInterval: Long, serializedData: ByteArray?)
@WorkerThread
fun updateAllJobsToBePending()
@WorkerThread
fun updateJobs(jobSpecs: List<JobSpec>)
@WorkerThread
fun transformJobs(transformer: (JobSpec) -> JobSpec)
@WorkerThread
fun deleteJob(id: String)
@WorkerThread
fun deleteJobs(ids: List<String>)
@WorkerThread
fun getConstraintSpecs(jobId: String): List<ConstraintSpec>
@WorkerThread
fun getDependencySpecsThatDependOnJob(jobSpecId: String): List<DependencySpec>
@WorkerThread
fun debugGetJobSpecs(limit: Int): List<JobSpec>
@WorkerThread
fun debugGetConstraintSpecs(limit: Int): List<ConstraintSpec>
@WorkerThread
fun debugGetAllDependencySpecs(): List<DependencySpec>
}

View File

@@ -10,6 +10,7 @@ import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec
import org.thoughtcrime.securesms.jobmanager.persistence.JobStorage
import org.thoughtcrime.securesms.util.LRUCache
import java.util.TreeSet
import java.util.function.Predicate
class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
@@ -17,23 +18,39 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
private const val JOB_CACHE_LIMIT = 1000
}
/** We keep a trimmed down version of every job in memory. */
private val minimalJobs: MutableList<MinimalJobSpec> = mutableListOf()
/**
* We keep a set of job specs in memory to facilitate fast retrieval. This is important because the most common job storage pattern is
* [getPendingJobsWithNoDependenciesInCreatedOrder], which needs to return full specs.
*/
private val jobSpecCache: LRUCache<String, JobSpec> = LRUCache(JOB_CACHE_LIMIT)
private val jobs: MutableList<MinimalJobSpec> = mutableListOf()
/**
* We keep a set of constraints in memory, seeded by the same jobs in the [jobSpecCache]. It doesn't need to necessarily stay in sync with that cache, though.
* The most important property to maintain is that if there's an entry in the map for a given jobId, we need to ensure we have _all_ of the constraints for
* that job. Important for [getConstraintSpecs].
*/
private val constraintsByJobId: LRUCache<String, MutableList<ConstraintSpec>> = LRUCache(JOB_CACHE_LIMIT)
// TODO [job] Rather than duplicate what is likely the same handful of constraints over and over, we should somehow re-use instances
private val constraintsByJobId: MutableMap<String, MutableList<ConstraintSpec>> = mutableMapOf()
private val dependenciesByJobId: MutableMap<String, MutableList<DependencySpec>> = mutableMapOf()
/** We keep every dependency in memory, since there aren't that many, and managing a limited subset would be very complicated. */
private val dependenciesByJobId: MutableMap<String, MutableList<DependencySpec>> = hashMapOf()
/** The list of jobs eligible to be returned from [getPendingJobsWithNoDependenciesInCreatedOrder], kept sorted in the appropriate order. */
private val eligibleJobs: TreeSet<MinimalJobSpec> = TreeSet(EligibleJobComparator)
/** All migration-related jobs, kept in the appropriate order. */
private val migrationJobs: TreeSet<MinimalJobSpec> = TreeSet(compareBy { it.createTime })
/** We need a fast way to know what the "most eligible job" is for a given queue. This serves as a lookup table that speeds up the maintenance of [eligibleJobs]. */
private val mostEligibleJobForQueue: MutableMap<String, MinimalJobSpec> = hashMapOf()
@Synchronized
override fun init() {
jobs += jobDatabase.getAllMinimalJobSpecs()
minimalJobs += jobDatabase.getAllMinimalJobSpecs()
for (job in jobs) {
for (job in minimalJobs) {
if (job.queueKey == Job.Parameters.MIGRATION_QUEUE_KEY) {
migrationJobs += job
} else {
@@ -41,11 +58,11 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
}
}
jobDatabase.getOldestJobSpecs(JOB_CACHE_LIMIT).forEach {
jobDatabase.getJobSpecs(JOB_CACHE_LIMIT).forEach {
jobSpecCache[it.id] = it
}
for (constraintSpec in jobDatabase.getAllConstraintSpecs()) {
for (constraintSpec in jobDatabase.getConstraintSpecsForJobs(jobSpecCache.keys)) {
val jobConstraints: MutableList<ConstraintSpec> = constraintsByJobId.getOrPut(constraintSpec.jobSpecId) { mutableListOf() }
jobConstraints += constraintSpec
}
@@ -66,7 +83,7 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
for (fullSpec in fullSpecs) {
val minimalJobSpec = fullSpec.jobSpec.toMinimalJobSpec()
jobs += minimalJobSpec
minimalJobs += minimalJobSpec
jobSpecCache[fullSpec.jobSpec.id] = fullSpec.jobSpec
if (fullSpec.jobSpec.queueKey == Job.Parameters.MIGRATION_QUEUE_KEY) {
@@ -82,13 +99,12 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
@Synchronized
override fun getJobSpec(id: String): JobSpec? {
return jobs.firstOrNull { it.id == id }?.toJobSpec()
return minimalJobs.firstOrNull { it.id == id }?.toJobSpec()
}
@Synchronized
override fun getAllJobSpecs(): List<JobSpec> {
// TODO [job] this will have to change
return jobDatabase.getAllJobSpecs()
override fun getAllMatchingFilter(predicate: Predicate<JobSpec>): List<JobSpec> {
return jobDatabase.getAllMatchingFilter(predicate)
}
@Synchronized
@@ -115,28 +131,28 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
@Synchronized
override fun getJobsInQueue(queue: String): List<JobSpec> {
return jobs
return minimalJobs
.filter { it.queueKey == queue }
.map { it.toJobSpec() }
}
@Synchronized
override fun getJobCountForFactory(factoryKey: String): Int {
return jobs
return minimalJobs
.filter { it.factoryKey == factoryKey }
.size
}
@Synchronized
override fun getJobCountForFactoryAndQueue(factoryKey: String, queueKey: String): Int {
return jobs
return minimalJobs
.filter { it.factoryKey == factoryKey && it.queueKey == queueKey }
.size
}
@Synchronized
override fun areQueuesEmpty(queueKeys: Set<String>): Boolean {
return jobs.none { it.queueKey != null && queueKeys.contains(it.queueKey) }
return minimalJobs.none { it.queueKey != null && queueKeys.contains(it.queueKey) }
}
@Synchronized
@@ -229,13 +245,32 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
}
@Synchronized
override fun deleteJob(jobId: String) {
deleteJobs(listOf(jobId))
override fun transformJobs(transformer: (JobSpec) -> JobSpec) {
val updated = jobDatabase.transformJobs(transformer)
for (update in updated) {
jobSpecCache[update.id] = update
}
val iterator = minimalJobs.listIterator()
while (iterator.hasNext()) {
val current = iterator.next()
val updatedJob = updated.firstOrNull { it.id == current.id }
if (updatedJob != null) {
iterator.set(updatedJob.toMinimalJobSpec())
replaceJobInEligibleList(current, updatedJob.toMinimalJobSpec())
}
}
}
@Synchronized
override fun deleteJobs(jobIds: List<String>) {
val jobsToDelete: Set<JobSpec> = jobIds
override fun deleteJob(id: String) {
deleteJobs(listOf(id))
}
@Synchronized
override fun deleteJobs(ids: List<String>) {
val jobsToDelete: Set<JobSpec> = ids
.mapNotNull { getJobSpec(it) }
.toSet()
@@ -251,13 +286,13 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
jobDatabase.deleteJobs(durableJobIdsToDelete)
}
val deleteIds: Set<String> = jobIds.toSet()
jobs.removeIf { deleteIds.contains(it.id) }
val deleteIds: Set<String> = ids.toSet()
minimalJobs.removeIf { deleteIds.contains(it.id) }
jobSpecCache.keys.removeAll(deleteIds)
eligibleJobs.removeAll(minimalJobsToDelete)
migrationJobs.removeAll(minimalJobsToDelete)
for (jobId in jobIds) {
for (jobId in ids) {
constraintsByJobId.remove(jobId)
dependenciesByJobId.remove(jobId)
@@ -275,12 +310,9 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
@Synchronized
override fun getConstraintSpecs(jobId: String): List<ConstraintSpec> {
return constraintsByJobId.getOrElse(jobId) { listOf() }
}
@Synchronized
override fun getAllConstraintSpecs(): List<ConstraintSpec> {
return constraintsByJobId.values.flatten()
return constraintsByJobId.getOrPut(jobId) {
jobDatabase.getConstraintSpecsForJobs(listOf(jobId)).toMutableList()
}
}
@Synchronized
@@ -301,12 +333,22 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
}
@Synchronized
override fun getAllDependencySpecs(): List<DependencySpec> {
override fun debugGetJobSpecs(limit: Int): List<JobSpec> {
return jobDatabase.getJobSpecs(limit)
}
@Synchronized
override fun debugGetConstraintSpecs(limit: Int): List<ConstraintSpec> {
return jobDatabase.getConstraintSpecs(limit)
}
@Synchronized
override fun debugGetAllDependencySpecs(): List<DependencySpec> {
return dependenciesByJobId.values.flatten()
}
private fun updateCachedJobSpecs(filter: (MinimalJobSpec) -> Boolean, transformer: (MinimalJobSpec) -> MinimalJobSpec, singleUpdate: Boolean = false) {
val iterator = jobs.listIterator()
val iterator = minimalJobs.listIterator()
while (iterator.hasNext()) {
val current = iterator.next()
@@ -329,10 +371,10 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
isMemoryOnly = updated.isMemoryOnly
)
jobSpecCache[updatedJobSpec.id] = updatedJobSpec
}
if (singleUpdate) {
return
if (singleUpdate) {
return
}
}
}
}

View File

@@ -12,8 +12,12 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import kotlin.jvm.functions.Function1;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
@@ -88,7 +92,14 @@ public class JobMigratorTest {
private static JobStorage simpleJobStorage() {
JobStorage jobStorage = mock(JobStorage.class);
when(jobStorage.getAllJobSpecs()).thenReturn(new ArrayList<>(Collections.singletonList(new JobSpec("1", "f1", null, 1, 1, 1, 1, 1, 1, null, null, false, false, 0))));
JobSpec job = new JobSpec("1", "f1", null, 1, 1, 1, 1, 1, 1, null, null, false, false, 0);
when(jobStorage.debugGetJobSpecs(anyInt())).thenReturn(new ArrayList<>(Collections.singletonList(job)));
doAnswer(invocation -> {
Function1<JobSpec, JobSpec> transformer = invocation.getArgument(0);
return transformer.invoke(job);
}).when(jobStorage).transformJobs(any());
return jobStorage;
}

View File

@@ -20,9 +20,9 @@ class FastJobStorageTest {
val subject = FastJobStorage(mockDatabase(DataSet1.FULL_SPECS))
subject.init()
DataSet1.assertJobsMatch(subject.allJobSpecs)
DataSet1.assertConstraintsMatch(subject.allConstraintSpecs)
DataSet1.assertDependenciesMatch(subject.allDependencySpecs)
DataSet1.assertJobsMatch(subject.debugGetJobSpecs(1000))
DataSet1.assertConstraintsMatch(subject.debugGetConstraintSpecs(1000))
DataSet1.assertDependenciesMatch(subject.debugGetAllDependencySpecs())
}
@Test
@@ -30,9 +30,9 @@ class FastJobStorageTest {
val subject = FastJobStorage(mockDatabase(DataSetCircularDependency.FULL_SPECS))
subject.init()
DataSetCircularDependency.assertJobsMatch(subject.allJobSpecs)
DataSetCircularDependency.assertConstraintsMatch(subject.allConstraintSpecs)
DataSetCircularDependency.assertDependenciesMatch(subject.allDependencySpecs)
DataSetCircularDependency.assertJobsMatch(subject.debugGetJobSpecs(1000))
DataSetCircularDependency.assertConstraintsMatch(subject.debugGetConstraintSpecs(1000))
DataSetCircularDependency.assertDependenciesMatch(subject.debugGetAllDependencySpecs())
}
@Test
@@ -59,9 +59,9 @@ class FastJobStorageTest {
fun `insertJobs - data can be found`() {
val subject = FastJobStorage(mockDatabase())
subject.insertJobs(DataSet1.FULL_SPECS)
DataSet1.assertJobsMatch(subject.allJobSpecs)
DataSet1.assertConstraintsMatch(subject.allConstraintSpecs)
DataSet1.assertDependenciesMatch(subject.allDependencySpecs)
DataSet1.assertJobsMatch(subject.debugGetJobSpecs(1000))
DataSet1.assertConstraintsMatch(subject.debugGetConstraintSpecs(1000))
DataSet1.assertDependenciesMatch(subject.debugGetAllDependencySpecs())
}
@Test
@@ -164,6 +164,71 @@ class FastJobStorageTest {
subject.getJobSpec("3") assertIs fullSpec3.jobSpec
}
@Test
fun `transformJobs - writes to database`() {
val database = mockDatabase(DataSet1.FULL_SPECS)
val subject = FastJobStorage(database)
subject.init()
val transformer: (JobSpec) -> JobSpec = { it }
subject.transformJobs(transformer)
verify { database.transformJobs(transformer) }
}
@Test
fun `transformJobs - updates all fields`() {
val fullSpec1 = FullSpec(jobSpec(id = "1", factoryKey = "f1"), emptyList(), emptyList())
val fullSpec2 = FullSpec(jobSpec(id = "2", factoryKey = "f2"), emptyList(), emptyList())
val fullSpec3 = FullSpec(jobSpec(id = "3", factoryKey = "f3"), emptyList(), emptyList())
val update1 = jobSpec(
id = "1",
factoryKey = "g1",
queueKey = "q1",
createTime = 2,
lastRunAttemptTime = 2,
nextBackoffInterval = 2,
runAttempt = 2,
maxAttempts = 2,
lifespan = 2,
serializedData = "abc".toByteArray(),
serializedInputData = null,
isRunning = true,
isMemoryOnly = false
)
val update2 = jobSpec(
id = "2",
factoryKey = "g2",
queueKey = "q2",
createTime = 3,
lastRunAttemptTime = 3,
nextBackoffInterval = 3,
runAttempt = 3,
maxAttempts = 3,
lifespan = 3,
serializedData = "def".toByteArray(),
serializedInputData = "ghi".toByteArray(),
isRunning = true,
isMemoryOnly = false
)
val subject = FastJobStorage(mockDatabase(listOf(fullSpec1, fullSpec2, fullSpec3)))
subject.init()
subject.transformJobs {
when (it.id) {
"1" -> update1
"2" -> update2
else -> it
}
}
subject.getJobSpec("1") assertIs update1
subject.getJobSpec("2") assertIs update2
subject.getJobSpec("3") assertIs fullSpec3.jobSpec
}
@Test
fun `markJobAsRunning - writes to database`() {
val database = mockDatabase(DataSet1.FULL_SPECS)
@@ -614,9 +679,9 @@ class FastJobStorageTest {
subject.deleteJobs(listOf("id1"))
val jobs = subject.allJobSpecs
val constraints = subject.allConstraintSpecs
val dependencies = subject.allDependencySpecs
val jobs = subject.debugGetJobSpecs(1000)
val constraints = subject.debugGetConstraintSpecs(1000)
val dependencies = subject.debugGetAllDependencySpecs()
jobs.size assertIs 2
jobs[0] assertIs DataSet1.JOB_2
@@ -727,11 +792,11 @@ class FastJobStorageTest {
val dependencies = fullSpecs.map { it.dependencySpecs }.flatten().toMutableList()
val mock = mockk<JobDatabase>(relaxed = true)
every { mock.getAllJobSpecs() } returns jobs
every { mock.getJobSpecs(any()) } returns jobs
every { mock.getAllMinimalJobSpecs() } returns jobs.map { it.toMinimalJobSpec() }
every { mock.getOldestJobSpecs(any()) } answers { jobs.sortedBy { it.createTime }.take(firstArg()) }
every { mock.getAllConstraintSpecs() } returns constraints
every { mock.getConstraintSpecs(any()) } returns constraints
every { mock.getAllDependencySpecs() } returns dependencies
every { mock.getConstraintSpecsForJobs(any()) } returns constraints
every { mock.getJobSpec(any()) } answers { jobs.first { it.id == firstArg() } }
every { mock.insertJobs(any()) } answers {
val inserts: List<FullSpec> = firstArg()
@@ -754,6 +819,20 @@ class FastJobStorageTest {
jobs += update
}
}
every { mock.transformJobs(any()) } answers {
val transformer: (JobSpec) -> JobSpec = firstArg()
val iterator = jobs.listIterator()
val out = mutableListOf<JobSpec>()
while (iterator.hasNext()) {
val current = iterator.next()
val updated = transformer(current)
iterator.set(transformer(current))
if (current != updated) {
out += updated
}
}
out
}
every { mock.updateAllJobsToBePending() } answers {
val iterator = jobs.listIterator()
while (iterator.hasNext()) {