Improve efficiency of sorting jobs in FastJobStorage.

This commit is contained in:
Greyson Parrelli
2024-07-15 10:42:10 -04:00
committed by Nicholas Tinsley
parent 625ca832b0
commit eb59afc33c
2 changed files with 300 additions and 93 deletions

View File

@@ -7,17 +7,33 @@ import org.thoughtcrime.securesms.jobmanager.persistence.DependencySpec
import org.thoughtcrime.securesms.jobmanager.persistence.FullSpec
import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec
import org.thoughtcrime.securesms.jobmanager.persistence.JobStorage
import java.util.TreeSet
class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
// TODO [job] We need a new jobspec that has no data (for space efficiency), and ideally no other random stuff that we don't need for filtering
private val jobs: MutableList<JobSpec> = mutableListOf()
private val constraintsByJobId: MutableMap<String, MutableList<ConstraintSpec>> = mutableMapOf()
private val dependenciesByJobId: MutableMap<String, MutableList<DependencySpec>> = mutableMapOf()
private val eligibleJobs: TreeSet<JobSpec> = TreeSet(EligibleJobComparator)
private val migrationJobs: TreeSet<JobSpec> = TreeSet(compareBy { it.createTime })
private val mostEligibleJobForQueue: MutableMap<String, JobSpec> = hashMapOf()
@Synchronized
override fun init() {
jobs += jobDatabase.getAllJobSpecs()
for (job in jobs) {
if (job.queueKey == Job.Parameters.MIGRATION_QUEUE_KEY) {
migrationJobs += job
} else {
// TODO [job] Because we're using a TreeSet, this operation becomes n*log(n). Ideal complexity for a sort, but think more about whether a bulk sort would be better.
placeJobInEligibleList(job)
}
}
for (constraintSpec in jobDatabase.getAllConstraintSpecs()) {
val jobConstraints: MutableList<ConstraintSpec> = constraintsByJobId.getOrPut(constraintSpec.jobSpecId) { mutableListOf() }
jobConstraints += constraintSpec
@@ -39,6 +55,13 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
for (fullSpec in fullSpecs) {
jobs += fullSpec.jobSpec
if (fullSpec.jobSpec.queueKey == Job.Parameters.MIGRATION_QUEUE_KEY) {
migrationJobs += fullSpec.jobSpec
} else {
placeJobInEligibleList(fullSpec.jobSpec)
}
constraintsByJobId[fullSpec.jobSpec.id] = fullSpec.constraintSpecs.toMutableList()
dependenciesByJobId[fullSpec.jobSpec.id] = fullSpec.dependencySpecs.toMutableList()
}
@@ -51,39 +74,28 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
@Synchronized
override fun getAllJobSpecs(): List<JobSpec> {
// TODO [job] this will have to change
return ArrayList(jobs)
}
@Synchronized
override fun getPendingJobsWithNoDependenciesInCreatedOrder(currentTime: Long): List<JobSpec> {
val migrationJob: JobSpec? = getMigrationJob()
val migrationJob: JobSpec? = migrationJobs.firstOrNull()
return if (migrationJob != null && !migrationJob.isRunning && migrationJob.hasEligibleRunTime(currentTime)) {
listOf(migrationJob)
} else if (migrationJob != null) {
emptyList()
} else {
jobs
.groupBy {
// Group together by queue. If it doesn't have a queue, we just use the ID, since it's unique and will give us all of the jobs without queues.
it.queueKey ?: it.id
}
.map { byQueueKey: Map.Entry<String, List<JobSpec>> ->
// We want to find the next job we should run within each queue. It should be the oldest job within the group of jobs with the highest priority.
// We can get this by sorting by createTime, then taking first job in that list that has the max priority.
byQueueKey.value
.sortedBy { it.createTime }
.maxByOrNull { it.priority }
}
.filterNotNull()
eligibleJobs
.asSequence()
.filter { job ->
// Filter out all jobs with unmet dependencies
dependenciesByJobId[job.id].isNullOrEmpty()
}
.filterNot { it.isRunning }
.filter { job -> job.hasEligibleRunTime(currentTime) }
.sortedBy { it.createTime }
.sortedByDescending { it.priority }
.toList()
// Note: The priority sort at the end is safe because it's stable. That means that within jobs with the same priority, they will still be sorted by createTime.
}
@@ -91,27 +103,7 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
@Synchronized
override fun getJobsInQueue(queue: String): List<JobSpec> {
return jobs
.filter { it.queueKey == queue }
.sortedBy { it.createTime }
}
private fun getMigrationJob(): JobSpec? {
return jobs
.filter { it.queueKey == Job.Parameters.MIGRATION_QUEUE_KEY }
.firstOrNull { firstInQueue(it) }
}
private fun firstInQueue(job: JobSpec): Boolean {
return if (job.queueKey == null) {
true
} else {
val firstInQueue: JobSpec? = jobs
.filter { it.queueKey == job.queueKey }
.minByOrNull { it.createTime }
job == firstInQueue
}
return jobs.filter { it.queueKey == queue }
}
@Synchronized
@@ -135,46 +127,59 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
@Synchronized
override fun markJobAsRunning(id: String, currentTime: Long) {
val job: JobSpec? = getJobById(id)
val job: JobSpec? = getJobSpec(id)
if (job == null || !job.isMemoryOnly) {
jobDatabase.markJobAsRunning(id, currentTime)
}
val iter = jobs.listIterator()
while (iter.hasNext()) {
val current: JobSpec = iter.next()
if (current.id == id) {
iter.set(
current.copy(
isRunning = true,
lastRunAttemptTime = currentTime
)
updateJobsInMemory(
filter = { it.id == id },
transformer = { jobSpec ->
jobSpec.copy(
isRunning = true,
lastRunAttemptTime = currentTime
)
}
}
},
singleUpdate = true
)
}
@Synchronized
override fun updateJobAfterRetry(id: String, currentTime: Long, runAttempt: Int, nextBackoffInterval: Long, serializedData: ByteArray?) {
val job = getJobById(id)
val job = getJobSpec(id)
if (job == null || !job.isMemoryOnly) {
jobDatabase.updateJobAfterRetry(id, currentTime, runAttempt, nextBackoffInterval, serializedData)
}
val iter = jobs.listIterator()
while (iter.hasNext()) {
val current = iter.next()
if (current.id == id) {
iter.set(
current.copy(
isRunning = false,
runAttempt = runAttempt,
lastRunAttemptTime = currentTime,
nextBackoffInterval = nextBackoffInterval,
serializedData = serializedData
)
updateJobsInMemory(
filter = { it.id == id },
transformer = { jobSpec ->
jobSpec.copy(
isRunning = false,
runAttempt = runAttempt,
lastRunAttemptTime = currentTime,
nextBackoffInterval = nextBackoffInterval,
serializedData = serializedData
)
},
singleUpdate = true
)
}
private fun updateJobsInMemory(filter: (JobSpec) -> Boolean, transformer: (JobSpec) -> JobSpec, singleUpdate: Boolean = false) {
val iterator = jobs.listIterator()
while (iterator.hasNext()) {
val current = iterator.next()
if (filter(current)) {
val updated = transformer(current)
iterator.set(updated)
replaceJobInEligibleList(current, updated)
if (singleUpdate) {
return
}
}
}
}
@@ -183,18 +188,21 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
override fun updateAllJobsToBePending() {
jobDatabase.updateAllJobsToBePending()
val iter = jobs.listIterator()
while (iter.hasNext()) {
val current = iter.next()
iter.set(current.copy(isRunning = false))
}
updateJobsInMemory(
filter = { it.isRunning },
transformer = { jobSpec ->
jobSpec.copy(
isRunning = false
)
}
)
}
@Synchronized
override fun updateJobs(jobSpecs: List<JobSpec>) {
val durable: List<JobSpec> = jobSpecs
.filter { updatedJob ->
val found = getJobById(updatedJob.id)
val found = getJobSpec(updatedJob.id)
found != null && !found.isMemoryOnly
}
@@ -204,15 +212,10 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
val updatesById: Map<String, JobSpec> = jobSpecs.associateBy { it.id }
val iter = jobs.listIterator()
while (iter.hasNext()) {
val current = iter.next()
val update = updatesById[current.id]
if (update != null) {
iter.set(update)
}
}
updateJobsInMemory(
filter = { updatesById.containsKey(it.id) },
transformer = { updatesById.getValue(it.id) }
)
}
@Synchronized
@@ -222,17 +225,21 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
@Synchronized
override fun deleteJobs(jobIds: List<String>) {
val durableIds: List<String> = jobIds
.mapNotNull { getJobById(it) }
val jobsToDelete: Set<JobSpec> = jobIds
.mapNotNull { getJobSpec(it) }
.toSet()
val durableJobIdsToDelete: List<String> = jobsToDelete
.filterNot { it.isMemoryOnly }
.map { it.id }
if (durableIds.isNotEmpty()) {
jobDatabase.deleteJobs(durableIds)
if (durableJobIdsToDelete.isNotEmpty()) {
jobDatabase.deleteJobs(durableJobIdsToDelete)
}
val deleteIds: Set<String> = jobIds.toSet()
jobs.removeIf { deleteIds.contains(it.id) }
eligibleJobs.removeAll(jobsToDelete)
migrationJobs.removeAll(jobsToDelete)
for (jobId in jobIds) {
constraintsByJobId.remove(jobId)
@@ -277,19 +284,59 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
return all
}
private fun getSingleLayerOfDependencySpecsThatDependOnJob(jobSpecId: String): List<DependencySpec> {
return dependenciesByJobId
.values
.flatten()
.filter { it.dependsOnJobId == jobSpecId }
}
@Synchronized
override fun getAllDependencySpecs(): List<DependencySpec> {
return dependenciesByJobId.values.flatten()
}
private fun getJobById(id: String): JobSpec? {
return jobs.firstOrNull { it.id == id }
private fun placeJobInEligibleList(job: JobSpec) {
var jobToPlace: JobSpec? = job
if (job.queueKey != null) {
val existingJobInQueue = mostEligibleJobForQueue[job.queueKey]
if (existingJobInQueue != null) {
// We only want a single job from each queue. It should be the oldest job with the highest priority.
if (job.priority > existingJobInQueue.priority || (job.priority == existingJobInQueue.priority && job.createTime < existingJobInQueue.createTime)) {
mostEligibleJobForQueue[job.queueKey] = job
eligibleJobs.remove(existingJobInQueue)
} else {
// There's a more eligible job in the queue already, so no need to put it in the eligible list
jobToPlace = null
}
}
}
if (jobToPlace == null) {
return
}
jobToPlace.queueKey?.let { queueKey ->
mostEligibleJobForQueue[queueKey] = job
}
// At this point, anything queue-related has been handled. We just need to insert this job in the correct spot in the list.
// Thankfully, we're using a TreeSet, so sorting is automatic.
eligibleJobs += jobToPlace
}
private fun replaceJobInEligibleList(current: JobSpec?, updated: JobSpec?) {
if (current == null || updated == null) {
return
}
if (updated.queueKey == Job.Parameters.MIGRATION_QUEUE_KEY) {
migrationJobs.remove(current)
migrationJobs += updated
} else {
eligibleJobs.remove(current)
current.queueKey?.let { queueKey ->
if (mostEligibleJobForQueue[queueKey] == current) {
mostEligibleJobForQueue.remove(queueKey)
}
}
placeJobInEligibleList(updated)
}
}
/**
@@ -304,8 +351,8 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
* serves the same effect and doesn't require new write methods. This should also be very rare.
*/
private fun DependencySpec.hasCircularDependency(): Boolean {
val job = getJobById(this.jobId)
val dependsOnJob = getJobById(this.dependsOnJobId)
val job = getJobSpec(this.jobId)
val dependsOnJob = getJobSpec(this.dependsOnJobId)
if (job == null || dependsOnJob == null) {
return false
@@ -328,4 +375,37 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
private fun JobSpec.hasEligibleRunTime(currentTime: Long): Boolean {
return this.lastRunAttemptTime > currentTime || (this.lastRunAttemptTime + this.nextBackoffInterval) < currentTime
}
private fun getSingleLayerOfDependencySpecsThatDependOnJob(jobSpecId: String): List<DependencySpec> {
return dependenciesByJobId
.values
.flatten()
.filter { it.dependsOnJobId == jobSpecId }
}
private object EligibleJobComparator : Comparator<JobSpec> {
override fun compare(o1: JobSpec, o2: JobSpec): Int {
// We want to sort by priority descending, then createTime ascending
// CAUTION: This is used by a TreeSet, so it must be consistent with equals.
// If this compare function says two objects are equal, then only one will be allowed in the set!
return when {
o1.priority > o2.priority -> -1
o1.priority < o2.priority -> 1
o1.createTime < o2.createTime -> -1
o1.createTime > o2.createTime -> 1
else -> o1.id.compareTo(o2.id)
}
}
}
private data class MinimalJobSpec(
val id: String,
val factoryKey: String,
val queueKey: String?,
val createTime: Long,
val priority: Int,
val isRunning: Boolean,
val isMemoryOnly: Boolean
)
}

View File

@@ -452,6 +452,133 @@ class FastJobStorageTest {
jobs.size assertIs 0
}
@Test
fun `getPendingJobsWithNoDependenciesInCreatedOrder - after deleted, no longer is in eligible list`() {
val subject = FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS))
subject.init()
var jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(100)
jobs.contains(DataSet1.JOB_1) assertIs true
subject.deleteJobs(listOf("id1"))
jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(100)
jobs.contains(DataSet1.JOB_1) assertIs false
}
@Test
fun `getPendingJobsWithNoDependenciesInCreatedOrder - after marked running, no longer is in eligible list`() {
val subject = FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS))
subject.init()
var jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(100)
jobs.contains(DataSet1.JOB_1) assertIs true
subject.markJobAsRunning("id1", 1)
jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(100)
jobs.contains(DataSet1.JOB_1) assertIs false
}
@Test
fun `getPendingJobsWithNoDependenciesInCreatedOrder - after updateJobAfterRetry to be invalid, no longer is in eligible list`() {
val subject = FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS))
subject.init()
var jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(100)
jobs.contains(DataSet1.JOB_1) assertIs true
subject.updateJobAfterRetry("id1", 1, 1000, 1_000_000, null)
jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(100)
jobs.contains(DataSet1.JOB_1) assertIs false
}
@Test
fun `getPendingJobsWithNoDependenciesInCreatedOrder - after invalid then marked pending, is in eligible list`() {
val subject = FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS))
subject.init()
subject.markJobAsRunning("id1", 1)
var jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(100)
jobs.contains(DataSet1.JOB_1) assertIs false
subject.updateAllJobsToBePending()
jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(100)
jobs.filter { it.id == DataSet1.JOB_1.id }.size assertIs 1 // The last run attempt time changes, so some fields will be different
}
@Test
fun `getPendingJobsWithNoDependenciesInCreatedOrder - after updateJobs to be invalid, no longer is in eligible list`() {
val subject = FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS))
subject.init()
var jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(100)
jobs.contains(DataSet1.JOB_1) assertIs true
subject.updateJobs(listOf(DataSet1.JOB_1.copy(isRunning = true)))
jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(100)
jobs.contains(DataSet1.JOB_1) assertIs false
}
@Test
fun `getPendingJobsWithNoDependenciesInCreatedOrder - newly-inserted higher-priority job in queue replaces old`() {
val subject = FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS))
subject.init()
var jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(100)
jobs.contains(DataSet1.JOB_1) assertIs true
val higherPriorityJob = DataSet1.JOB_1.copy(id = "id-bigboi", priority = Job.Parameters.PRIORITY_HIGH)
subject.insertJobs(listOf(FullSpec(jobSpec = higherPriorityJob, constraintSpecs = emptyList(), dependencySpecs = emptyList())))
jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(100)
jobs.contains(DataSet1.JOB_1) assertIs false
jobs.contains(higherPriorityJob) assertIs true
}
@Test
fun `getPendingJobsWithNoDependenciesInCreatedOrder - updating job to have a higher priority replaces lower priority in queue`() {
val subject = FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS))
subject.init()
val lowerPriorityJob = DataSet1.JOB_1.copy(id = "id-bigboi", priority = Job.Parameters.PRIORITY_LOW)
subject.insertJobs(listOf(FullSpec(jobSpec = lowerPriorityJob, constraintSpecs = emptyList(), dependencySpecs = emptyList())))
var jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(100)
jobs.contains(DataSet1.JOB_1) assertIs true
jobs.contains(lowerPriorityJob) assertIs false
val higherPriorityJob = lowerPriorityJob.copy(priority = Job.Parameters.PRIORITY_HIGH)
subject.updateJobs(listOf(higherPriorityJob))
jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(100)
jobs.contains(DataSet1.JOB_1) assertIs false
jobs.contains(higherPriorityJob) assertIs true
}
@Test
fun `getPendingJobsWithNoDependenciesInCreatedOrder - updating job to have an older createTime replaces newer in queue`() {
val subject = FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS))
subject.init()
val newerJob = DataSet1.JOB_1.copy(id = "id-bigboi", createTime = 1000)
subject.insertJobs(listOf(FullSpec(jobSpec = newerJob, constraintSpecs = emptyList(), dependencySpecs = emptyList())))
var jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(100)
jobs.contains(DataSet1.JOB_1) assertIs true
jobs.contains(newerJob) assertIs false
val olderJob = newerJob.copy(createTime = 0)
subject.updateJobs(listOf(olderJob))
jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(100)
jobs.contains(DataSet1.JOB_1) assertIs false
jobs.contains(olderJob) assertIs true
}
@Test
fun `deleteJobs - writes to database`() {
val database = fixedDataDatabase(DataSet1.FULL_SPECS)