Use a minimal job spec representation in memory.

This commit is contained in:
Greyson Parrelli
2024-07-16 10:53:10 -04:00
committed by Nicholas Tinsley
parent eb59afc33c
commit 973dc72cfa
5 changed files with 340 additions and 142 deletions

View File

@@ -13,6 +13,7 @@ import org.signal.core.util.delete
import org.signal.core.util.insertInto
import org.signal.core.util.logging.Log
import org.signal.core.util.readToList
import org.signal.core.util.readToSingleObject
import org.signal.core.util.requireBlob
import org.signal.core.util.requireBoolean
import org.signal.core.util.requireInt
@@ -29,6 +30,7 @@ import org.thoughtcrime.securesms.jobmanager.persistence.ConstraintSpec
import org.thoughtcrime.securesms.jobmanager.persistence.DependencySpec
import org.thoughtcrime.securesms.jobmanager.persistence.FullSpec
import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec
import org.thoughtcrime.securesms.jobs.MinimalJobSpec
class JobDatabase(
application: Application,
@@ -183,6 +185,43 @@ class JobDatabase(
@Synchronized
fun getAllJobSpecs(): List<JobSpec> {
return readableDatabase
.select()
.from(Jobs.TABLE_NAME)
.orderBy("${Jobs.CREATE_TIME}, ${Jobs.ID} ASC")
.run()
.readToList { cursor ->
jobSpecFromCursor(cursor)
}
}
@Synchronized
fun getOldestJobSpecs(limit: Int): List<JobSpec> {
return readableDatabase
.select()
.from(Jobs.TABLE_NAME)
.orderBy("${Jobs.CREATE_TIME}, ${Jobs.ID} ASC")
.limit(limit)
.run()
.readToList { cursor ->
jobSpecFromCursor(cursor)
}
}
@Synchronized
fun getJobSpec(id: String): JobSpec? {
return readableDatabase
.select()
.from(Jobs.TABLE_NAME)
.where("${Jobs.JOB_SPEC_ID} = ?", id)
.run()
.readToSingleObject {
jobSpecFromCursor(it)
}
}
@Synchronized
fun getAllMinimalJobSpecs(): List<MinimalJobSpec> {
val columns = arrayOf(
Jobs.ID,
Jobs.JOB_SPEC_ID,
@@ -191,18 +230,23 @@ class JobDatabase(
Jobs.CREATE_TIME,
Jobs.LAST_RUN_ATTEMPT_TIME,
Jobs.NEXT_BACKOFF_INTERVAL,
Jobs.RUN_ATTEMPT,
Jobs.MAX_ATTEMPTS,
Jobs.LIFESPAN,
Jobs.SERIALIZED_DATA,
Jobs.SERIALIZED_INPUT_DATA,
Jobs.IS_RUNNING,
Jobs.PRIORITY
)
return readableDatabase
.query(Jobs.TABLE_NAME, columns, null, null, null, null, "${Jobs.CREATE_TIME}, ${Jobs.ID} ASC")
.readToList { cursor ->
jobSpecFromCursor(cursor)
MinimalJobSpec(
id = cursor.requireNonNullString(Jobs.JOB_SPEC_ID),
factoryKey = cursor.requireNonNullString(Jobs.FACTORY_KEY),
queueKey = cursor.requireNonNullString(Jobs.QUEUE_KEY),
createTime = cursor.requireLong(Jobs.CREATE_TIME),
lastRunAttemptTime = cursor.requireLong(Jobs.LAST_RUN_ATTEMPT_TIME),
nextBackoffInterval = cursor.requireLong(Jobs.NEXT_BACKOFF_INTERVAL),
priority = cursor.requireInt(Jobs.PRIORITY),
isRunning = cursor.requireBoolean(Jobs.IS_RUNNING),
isMemoryOnly = false
)
}
}

View File

@@ -1,5 +1,6 @@
package org.thoughtcrime.securesms.jobs
import androidx.annotation.VisibleForTesting
import org.thoughtcrime.securesms.database.JobDatabase
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobmanager.persistence.ConstraintSpec
@@ -7,33 +8,43 @@ import org.thoughtcrime.securesms.jobmanager.persistence.DependencySpec
import org.thoughtcrime.securesms.jobmanager.persistence.FullSpec
import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec
import org.thoughtcrime.securesms.jobmanager.persistence.JobStorage
import org.thoughtcrime.securesms.util.LRUCache
import java.util.TreeSet
class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
// TODO [job] We need a new jobspec that has no data (for space efficiency), and ideally no other random stuff that we don't need for filtering
companion object {
private const val JOB_CACHE_LIMIT = 1000
}
private val jobs: MutableList<JobSpec> = mutableListOf()
private val jobSpecCache: LRUCache<String, JobSpec> = LRUCache(JOB_CACHE_LIMIT)
private val jobs: MutableList<MinimalJobSpec> = mutableListOf()
// TODO [job] Rather than duplicate what is likely the same handful of constraints over and over, we should somehow re-use instances
private val constraintsByJobId: MutableMap<String, MutableList<ConstraintSpec>> = mutableMapOf()
private val dependenciesByJobId: MutableMap<String, MutableList<DependencySpec>> = mutableMapOf()
private val eligibleJobs: TreeSet<JobSpec> = TreeSet(EligibleJobComparator)
private val migrationJobs: TreeSet<JobSpec> = TreeSet(compareBy { it.createTime })
private val mostEligibleJobForQueue: MutableMap<String, JobSpec> = hashMapOf()
private val eligibleJobs: TreeSet<MinimalJobSpec> = TreeSet(EligibleJobComparator)
private val migrationJobs: TreeSet<MinimalJobSpec> = TreeSet(compareBy { it.createTime })
private val mostEligibleJobForQueue: MutableMap<String, MinimalJobSpec> = hashMapOf()
@Synchronized
override fun init() {
jobs += jobDatabase.getAllJobSpecs()
jobs += jobDatabase.getAllMinimalJobSpecs()
for (job in jobs) {
if (job.queueKey == Job.Parameters.MIGRATION_QUEUE_KEY) {
migrationJobs += job
} else {
// TODO [job] Because we're using a TreeSet, this operation becomes n*log(n). Ideal complexity for a sort, but think more about whether a bulk sort would be better.
placeJobInEligibleList(job)
}
}
jobDatabase.getOldestJobSpecs(JOB_CACHE_LIMIT).forEach {
jobSpecCache[it.id] = it
}
for (constraintSpec in jobDatabase.getAllConstraintSpecs()) {
val jobConstraints: MutableList<ConstraintSpec> = constraintsByJobId.getOrPut(constraintSpec.jobSpecId) { mutableListOf() }
jobConstraints += constraintSpec
@@ -54,12 +65,14 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
}
for (fullSpec in fullSpecs) {
jobs += fullSpec.jobSpec
val minimalJobSpec = fullSpec.jobSpec.toMinimalJobSpec()
jobs += minimalJobSpec
jobSpecCache[fullSpec.jobSpec.id] = fullSpec.jobSpec
if (fullSpec.jobSpec.queueKey == Job.Parameters.MIGRATION_QUEUE_KEY) {
migrationJobs += fullSpec.jobSpec
migrationJobs += minimalJobSpec
} else {
placeJobInEligibleList(fullSpec.jobSpec)
placeJobInEligibleList(minimalJobSpec)
}
constraintsByJobId[fullSpec.jobSpec.id] = fullSpec.constraintSpecs.toMutableList()
@@ -69,21 +82,21 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
@Synchronized
override fun getJobSpec(id: String): JobSpec? {
return jobs.firstOrNull { it.id == id }
return jobs.firstOrNull { it.id == id }?.toJobSpec()
}
@Synchronized
override fun getAllJobSpecs(): List<JobSpec> {
// TODO [job] this will have to change
return ArrayList(jobs)
return jobDatabase.getAllJobSpecs()
}
@Synchronized
override fun getPendingJobsWithNoDependenciesInCreatedOrder(currentTime: Long): List<JobSpec> {
val migrationJob: JobSpec? = migrationJobs.firstOrNull()
val migrationJob: MinimalJobSpec? = migrationJobs.firstOrNull()
return if (migrationJob != null && !migrationJob.isRunning && migrationJob.hasEligibleRunTime(currentTime)) {
listOf(migrationJob)
listOf(migrationJob.toJobSpec())
} else if (migrationJob != null) {
emptyList()
} else {
@@ -95,15 +108,16 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
}
.filterNot { it.isRunning }
.filter { job -> job.hasEligibleRunTime(currentTime) }
.map { it.toJobSpec() }
.toList()
// Note: The priority sort at the end is safe because it's stable. That means that within jobs with the same priority, they will still be sorted by createTime.
}
}
@Synchronized
override fun getJobsInQueue(queue: String): List<JobSpec> {
return jobs.filter { it.queueKey == queue }
return jobs
.filter { it.queueKey == queue }
.map { it.toJobSpec() }
}
@Synchronized
@@ -130,9 +144,10 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
val job: JobSpec? = getJobSpec(id)
if (job == null || !job.isMemoryOnly) {
jobDatabase.markJobAsRunning(id, currentTime)
// Don't need to update jobSpecCache because all changed fields are in the min spec
}
updateJobsInMemory(
updateCachedJobSpecs(
filter = { it.id == id },
transformer = { jobSpec ->
jobSpec.copy(
@@ -149,46 +164,35 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
val job = getJobSpec(id)
if (job == null || !job.isMemoryOnly) {
jobDatabase.updateJobAfterRetry(id, currentTime, runAttempt, nextBackoffInterval, serializedData)
// Note: All other fields are accounted for in the min spec. We only need to update from disk if serialized data changes.
val cached = jobSpecCache[id]
if (cached != null && !cached.serializedData.contentEquals(serializedData)) {
jobDatabase.getJobSpec(id)?.let {
jobSpecCache[id] = it
}
}
}
updateJobsInMemory(
updateCachedJobSpecs(
filter = { it.id == id },
transformer = { jobSpec ->
jobSpec.copy(
isRunning = false,
runAttempt = runAttempt,
lastRunAttemptTime = currentTime,
nextBackoffInterval = nextBackoffInterval,
serializedData = serializedData
nextBackoffInterval = nextBackoffInterval
)
},
singleUpdate = true
)
}
private fun updateJobsInMemory(filter: (JobSpec) -> Boolean, transformer: (JobSpec) -> JobSpec, singleUpdate: Boolean = false) {
val iterator = jobs.listIterator()
while (iterator.hasNext()) {
val current = iterator.next()
if (filter(current)) {
val updated = transformer(current)
iterator.set(updated)
replaceJobInEligibleList(current, updated)
if (singleUpdate) {
return
}
}
}
}
@Synchronized
override fun updateAllJobsToBePending() {
jobDatabase.updateAllJobsToBePending()
// Don't need to update jobSpecCache because all changed fields are in the min spec
updateJobsInMemory(
updateCachedJobSpecs(
filter = { it.isRunning },
transformer = { jobSpec ->
jobSpec.copy(
@@ -210,12 +214,18 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
jobDatabase.updateJobs(durable)
}
val updatesById: Map<String, JobSpec> = jobSpecs.associateBy { it.id }
val updatesById: Map<String, MinimalJobSpec> = jobSpecs
.map { it.toMinimalJobSpec() }
.associateBy { it.id }
updateJobsInMemory(
updateCachedJobSpecs(
filter = { updatesById.containsKey(it.id) },
transformer = { updatesById.getValue(it.id) }
)
for (update in jobSpecs) {
jobSpecCache[update.id] = update
}
}
@Synchronized
@@ -228,18 +238,24 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
val jobsToDelete: Set<JobSpec> = jobIds
.mapNotNull { getJobSpec(it) }
.toSet()
val durableJobIdsToDelete: List<String> = jobsToDelete
.filterNot { it.isMemoryOnly }
.map { it.id }
val minimalJobsToDelete: Set<MinimalJobSpec> = jobsToDelete
.map { it.toMinimalJobSpec() }
.toSet()
if (durableJobIdsToDelete.isNotEmpty()) {
jobDatabase.deleteJobs(durableJobIdsToDelete)
}
val deleteIds: Set<String> = jobIds.toSet()
jobs.removeIf { deleteIds.contains(it.id) }
eligibleJobs.removeAll(jobsToDelete)
migrationJobs.removeAll(jobsToDelete)
jobSpecCache.keys.removeAll(deleteIds)
eligibleJobs.removeAll(minimalJobsToDelete)
migrationJobs.removeAll(minimalJobsToDelete)
for (jobId in jobIds) {
constraintsByJobId.remove(jobId)
@@ -289,8 +305,44 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
return dependenciesByJobId.values.flatten()
}
private fun placeJobInEligibleList(job: JobSpec) {
var jobToPlace: JobSpec? = job
private fun updateCachedJobSpecs(filter: (MinimalJobSpec) -> Boolean, transformer: (MinimalJobSpec) -> MinimalJobSpec, singleUpdate: Boolean = false) {
val iterator = jobs.listIterator()
while (iterator.hasNext()) {
val current = iterator.next()
if (filter(current)) {
val updated = transformer(current)
iterator.set(updated)
replaceJobInEligibleList(current, updated)
jobSpecCache.remove(current.id)?.let { currentJobSpec ->
val updatedJobSpec = currentJobSpec.copy(
id = updated.id,
factoryKey = updated.factoryKey,
queueKey = updated.queueKey,
createTime = updated.createTime,
lastRunAttemptTime = updated.lastRunAttemptTime,
nextBackoffInterval = updated.nextBackoffInterval,
priority = updated.priority,
isRunning = updated.isRunning,
isMemoryOnly = updated.isMemoryOnly
)
jobSpecCache[updatedJobSpec.id] = updatedJobSpec
}
if (singleUpdate) {
return
}
}
}
}
/**
* Heart of a lot of the in-memory job management. Will ensure that we have an up-to-date list of eligible jobs in sorted order.
*/
private fun placeJobInEligibleList(job: MinimalJobSpec) {
var jobToPlace: MinimalJobSpec? = job
if (job.queueKey != null) {
val existingJobInQueue = mostEligibleJobForQueue[job.queueKey]
@@ -320,7 +372,10 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
eligibleJobs += jobToPlace
}
private fun replaceJobInEligibleList(current: JobSpec?, updated: JobSpec?) {
/**
* Replaces a job in the eligible list with an updated version of the job.
*/
private fun replaceJobInEligibleList(current: MinimalJobSpec?, updated: MinimalJobSpec?) {
if (current == null || updated == null) {
return
}
@@ -372,7 +427,7 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
/**
* Whether or not the job's eligible to be run based off of it's [Job.nextBackoffInterval] and other properties.
*/
private fun JobSpec.hasEligibleRunTime(currentTime: Long): Boolean {
private fun MinimalJobSpec.hasEligibleRunTime(currentTime: Long): Boolean {
return this.lastRunAttemptTime > currentTime || (this.lastRunAttemptTime + this.nextBackoffInterval) < currentTime
}
@@ -383,12 +438,23 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
.filter { it.dependsOnJobId == jobSpecId }
}
private object EligibleJobComparator : Comparator<JobSpec> {
override fun compare(o1: JobSpec, o2: JobSpec): Int {
/**
* Converts a [MinimalJobSpec] to a [JobSpec]. We prefer using the cache, but if it's not found, we'll hit the database.
* We consider this a "recent access" and will cache it for future use.
*/
private fun MinimalJobSpec.toJobSpec(): JobSpec {
return jobSpecCache.getOrPut(this.id) {
jobDatabase.getJobSpec(this.id) ?: throw IllegalArgumentException("JobSpec not found for id: $id")
}
}
private object EligibleJobComparator : Comparator<MinimalJobSpec> {
override fun compare(o1: MinimalJobSpec, o2: MinimalJobSpec): Int {
// We want to sort by priority descending, then createTime ascending
// CAUTION: This is used by a TreeSet, so it must be consistent with equals.
// If this compare function says two objects are equal, then only one will be allowed in the set!
// This is why the last step is to compare the IDs.
return when {
o1.priority > o2.priority -> -1
o1.priority < o2.priority -> 1
@@ -398,14 +464,22 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
}
}
}
}
private data class MinimalJobSpec(
val id: String,
val factoryKey: String,
val queueKey: String?,
val createTime: Long,
val priority: Int,
val isRunning: Boolean,
val isMemoryOnly: Boolean
/**
* Converts a [JobSpec] to a [MinimalJobSpec], which is just a matter of trimming off unnecessary properties.
*/
@VisibleForTesting
fun JobSpec.toMinimalJobSpec(): MinimalJobSpec {
return MinimalJobSpec(
id = this.id,
factoryKey = this.factoryKey,
queueKey = this.queueKey,
createTime = this.createTime,
lastRunAttemptTime = this.lastRunAttemptTime,
nextBackoffInterval = this.nextBackoffInterval,
priority = this.priority,
isRunning = this.isRunning,
isMemoryOnly = this.isMemoryOnly
)
}

View File

@@ -0,0 +1,22 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.jobs
/**
* A smaller version of [org.thoughtcrime.securesms.jobmanager.persistence.JobSpec] that contains on the the data we need
* to sort and pick jobs in [FastJobStorage].
*/
data class MinimalJobSpec(
val id: String,
val factoryKey: String,
val queueKey: String?,
val createTime: Long,
val lastRunAttemptTime: Long,
val nextBackoffInterval: Long,
val priority: Int,
val isRunning: Boolean,
val isMemoryOnly: Boolean
)