|
|
|
|
@@ -396,7 +396,7 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
|
|
|
|
|
createTime = updated.createTime,
|
|
|
|
|
lastRunAttemptTime = updated.lastRunAttemptTime,
|
|
|
|
|
nextBackoffInterval = updated.nextBackoffInterval,
|
|
|
|
|
priority = updated.priority,
|
|
|
|
|
globalPriority = updated.globalPriority,
|
|
|
|
|
isRunning = updated.isRunning,
|
|
|
|
|
isMemoryOnly = updated.isMemoryOnly
|
|
|
|
|
)
|
|
|
|
|
@@ -413,35 +413,36 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
|
|
|
|
|
/**
|
|
|
|
|
* Heart of a lot of the in-memory job management. Will ensure that we have an up-to-date list of eligible jobs in sorted order.
|
|
|
|
|
*/
|
|
|
|
|
private fun placeJobInEligibleList(job: MinimalJobSpec) {
|
|
|
|
|
var jobToPlace: MinimalJobSpec? = job
|
|
|
|
|
private fun placeJobInEligibleList(jobCandidate: MinimalJobSpec) {
|
|
|
|
|
val existingJobInQueue = jobCandidate.queueKey?.let { mostEligibleJobForQueue[it] }
|
|
|
|
|
if (existingJobInQueue != null) {
|
|
|
|
|
if (jobCandidate.globalPriority < existingJobInQueue.globalPriority) {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (job.queueKey != null) {
|
|
|
|
|
val existingJobInQueue = mostEligibleJobForQueue[job.queueKey]
|
|
|
|
|
if (existingJobInQueue != null) {
|
|
|
|
|
// We only want a single job from each queue. It should be the oldest job with the highest priority.
|
|
|
|
|
if (job.priority > existingJobInQueue.priority || (job.priority == existingJobInQueue.priority && job.createTime < existingJobInQueue.createTime)) {
|
|
|
|
|
mostEligibleJobForQueue[job.queueKey] = job
|
|
|
|
|
eligibleJobs.removeIf { it.id == existingJobInQueue.id }
|
|
|
|
|
} else {
|
|
|
|
|
// There's a more eligible job in the queue already, so no need to put it in the eligible list
|
|
|
|
|
jobToPlace = null
|
|
|
|
|
if (jobCandidate.globalPriority == existingJobInQueue.globalPriority) {
|
|
|
|
|
if (jobCandidate.queuePriority < existingJobInQueue.queuePriority) {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (jobCandidate.queuePriority == existingJobInQueue.queuePriority && jobCandidate.createTime >= existingJobInQueue.createTime) {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (jobToPlace == null) {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
// At this point, we know that the job candidate has a higher global priority, higher queue priority, or their priorities are the same but with an older creation time.
|
|
|
|
|
// That means we know it's now the most eligible job in its queue.
|
|
|
|
|
|
|
|
|
|
jobToPlace.queueKey?.let { queueKey ->
|
|
|
|
|
mostEligibleJobForQueue[queueKey] = job
|
|
|
|
|
jobCandidate.queueKey?.let { queueKey ->
|
|
|
|
|
eligibleJobs.removeIf { it.id == existingJobInQueue?.id }
|
|
|
|
|
mostEligibleJobForQueue[queueKey] = jobCandidate
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// At this point, anything queue-related has been handled. We just need to insert this job in the correct spot in the list.
|
|
|
|
|
// Thankfully, we're using a TreeSet, so sorting is automatic.
|
|
|
|
|
|
|
|
|
|
eligibleJobs += jobToPlace
|
|
|
|
|
eligibleJobs += jobCandidate
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
@@ -522,14 +523,15 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
|
|
|
|
|
|
|
|
|
|
private object EligibleMinJobComparator : Comparator<MinimalJobSpec> {
|
|
|
|
|
override fun compare(o1: MinimalJobSpec, o2: MinimalJobSpec): Int {
|
|
|
|
|
// We want to sort by priority descending, then createTime ascending
|
|
|
|
|
// We want to sort by priority descending, then createTime ascending.
|
|
|
|
|
// This is for determining which job to run across multiple queues, so queue priority is not considered.
|
|
|
|
|
|
|
|
|
|
// CAUTION: This is used by a TreeSet, so it must be consistent with equals.
|
|
|
|
|
// If this compare function says two objects are equal, then only one will be allowed in the set!
|
|
|
|
|
// This is why the last step is to compare the IDs.
|
|
|
|
|
return when {
|
|
|
|
|
o1.priority > o2.priority -> -1
|
|
|
|
|
o1.priority < o2.priority -> 1
|
|
|
|
|
o1.globalPriority > o2.globalPriority -> -1
|
|
|
|
|
o1.globalPriority < o2.globalPriority -> 1
|
|
|
|
|
o1.createTime < o2.createTime -> -1
|
|
|
|
|
o1.createTime > o2.createTime -> 1
|
|
|
|
|
else -> o1.id.compareTo(o2.id)
|
|
|
|
|
@@ -543,8 +545,8 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
|
|
|
|
|
private object EligibleFullJobComparator : Comparator<JobSpec> {
|
|
|
|
|
override fun compare(o1: JobSpec, o2: JobSpec): Int {
|
|
|
|
|
return when {
|
|
|
|
|
o1.priority > o2.priority -> -1
|
|
|
|
|
o1.priority < o2.priority -> 1
|
|
|
|
|
o1.globalPriority > o2.globalPriority -> -1
|
|
|
|
|
o1.globalPriority < o2.globalPriority -> 1
|
|
|
|
|
o1.createTime < o2.createTime -> -1
|
|
|
|
|
o1.createTime > o2.createTime -> 1
|
|
|
|
|
else -> o1.id.compareTo(o2.id)
|
|
|
|
|
@@ -569,7 +571,8 @@ fun JobSpec.toMinimalJobSpec(): MinimalJobSpec {
|
|
|
|
|
createTime = this.createTime,
|
|
|
|
|
lastRunAttemptTime = this.lastRunAttemptTime,
|
|
|
|
|
nextBackoffInterval = this.nextBackoffInterval,
|
|
|
|
|
priority = this.priority,
|
|
|
|
|
globalPriority = this.globalPriority,
|
|
|
|
|
queuePriority = this.queuePriority,
|
|
|
|
|
isRunning = this.isRunning,
|
|
|
|
|
isMemoryOnly = this.isMemoryOnly
|
|
|
|
|
)
|
|
|
|
|
|