Add the ability to set job priority.

This commit is contained in:
Greyson Parrelli
2023-11-03 09:21:27 -07:00
committed by GitHub
parent d00f2aa8d0
commit 145794bf04
8 changed files with 127 additions and 26 deletions

View File

@@ -59,6 +59,7 @@ class JobDatabase(
const val SERIALIZED_DATA = "serialized_data"
const val SERIALIZED_INPUT_DATA = "serialized_input_data"
const val IS_RUNNING = "is_running"
const val PRIORITY = "priority"
val CREATE_TABLE =
"""
@@ -75,7 +76,8 @@ class JobDatabase(
$SERIALIZED_DATA TEXT,
$SERIALIZED_INPUT_DATA TEXT DEFAULT NULL,
$IS_RUNNING INTEGER,
$NEXT_BACKOFF_INTERVAL INTEGER
$NEXT_BACKOFF_INTERVAL INTEGER,
$PRIORITY INTEGER DEFAULT 0
)
""".trimIndent()
}
@@ -145,6 +147,10 @@ class JobDatabase(
db.execSQL("ALTER TABLE job_spec ADD COLUMN next_backoff_interval INTEGER")
db.execSQL("UPDATE job_spec SET last_run_attempt_time = 0")
}
if (oldVersion < 3) {
db.execSQL("ALTER TABLE job_spec ADD COLUMN priority INTEGER DEFAULT 0")
}
}
override fun onOpen(db: SQLiteDatabase) {
@@ -189,7 +195,8 @@ class JobDatabase(
Jobs.LIFESPAN,
Jobs.SERIALIZED_DATA,
Jobs.SERIALIZED_INPUT_DATA,
Jobs.IS_RUNNING
Jobs.IS_RUNNING,
Jobs.PRIORITY
)
return readableDatabase
.query(Jobs.TABLE_NAME, columns, null, null, null, null, "${Jobs.CREATE_TIME}, ${Jobs.ID} ASC")
@@ -329,7 +336,8 @@ class JobDatabase(
Jobs.LIFESPAN to job.lifespan,
Jobs.SERIALIZED_DATA to job.serializedData,
Jobs.SERIALIZED_INPUT_DATA to job.serializedInputData,
Jobs.IS_RUNNING to if (job.isRunning) 1 else 0
Jobs.IS_RUNNING to if (job.isRunning) 1 else 0,
Jobs.PRIORITY to job.priority
)
.run(SQLiteDatabase.CONFLICT_IGNORE)
}
@@ -378,7 +386,8 @@ class JobDatabase(
serializedData = cursor.requireBlob(Jobs.SERIALIZED_DATA),
serializedInputData = cursor.requireBlob(Jobs.SERIALIZED_INPUT_DATA),
isRunning = cursor.requireBoolean(Jobs.IS_RUNNING),
isMemoryOnly = false
isMemoryOnly = false,
priority = cursor.requireInt(Jobs.PRIORITY)
)
}
@@ -416,7 +425,7 @@ class JobDatabase(
companion object {
private val TAG = Log.tag(JobDatabase::class.java)
private const val DATABASE_VERSION = 2
private const val DATABASE_VERSION = 3
private const val DATABASE_NAME = "signal-jobmanager.db"
@SuppressLint("StaticFieldLeak")

View File

@@ -268,6 +268,10 @@ public abstract class Job {
public static final long IMMORTAL = -1;
public static final int UNLIMITED = -1;
public static final int PRIORITY_DEFAULT = 0;
public static final int PRIORITY_HIGH = 1;
public static final int PRIORITY_LOW = -1;
private final String id;
private final long createTime;
private final long lifespan;
@@ -278,6 +282,7 @@ public abstract class Job {
private final List<String> constraintKeys;
private final byte[] inputData;
private final boolean memoryOnly;
private final int priority;
private Parameters(@NonNull String id,
long createTime,
@@ -288,7 +293,8 @@ public abstract class Job {
@Nullable String queue,
@NonNull List<String> constraintKeys,
@Nullable byte[] inputData,
boolean memoryOnly)
boolean memoryOnly,
int priority)
{
this.id = id;
this.createTime = createTime;
@@ -300,6 +306,7 @@ public abstract class Job {
this.constraintKeys = constraintKeys;
this.inputData = inputData;
this.memoryOnly = memoryOnly;
this.priority = priority;
}
@NonNull String getId() {
@@ -342,8 +349,12 @@ public abstract class Job {
return memoryOnly;
}
int getPriority() {
return priority;
}
public Builder toBuilder() {
return new Builder(id, createTime, lifespan, maxAttempts, maxInstancesForFactory, maxInstancesForQueue, queue, constraintKeys, inputData, memoryOnly);
return new Builder(id, createTime, lifespan, maxAttempts, maxInstancesForFactory, maxInstancesForQueue, queue, constraintKeys, inputData, memoryOnly, priority);
}
@@ -358,13 +369,14 @@ public abstract class Job {
private List<String> constraintKeys;
private byte[] inputData;
private boolean memoryOnly;
private int priority;
public Builder() {
this(UUID.randomUUID().toString());
}
Builder(@NonNull String id) {
this(id, System.currentTimeMillis(), IMMORTAL, 1, UNLIMITED, UNLIMITED, null, new LinkedList<>(), null, false);
this(id, System.currentTimeMillis(), IMMORTAL, 1, UNLIMITED, UNLIMITED, null, new LinkedList<>(), null, false, Parameters.PRIORITY_DEFAULT);
}
private Builder(@NonNull String id,
@@ -376,7 +388,8 @@ public abstract class Job {
@Nullable String queue,
@NonNull List<String> constraintKeys,
@Nullable byte[] inputData,
boolean memoryOnly)
boolean memoryOnly,
int priority)
{
this.id = id;
this.createTime = createTime;
@@ -388,6 +401,7 @@ public abstract class Job {
this.constraintKeys = constraintKeys;
this.inputData = inputData;
this.memoryOnly = memoryOnly;
this.priority = priority;
}
/** Should only be invoked by {@link JobController} */
@@ -481,6 +495,26 @@ public abstract class Job {
return this;
}
/**
* Sets the job's priority. Higher numbers are higher priority. Use the constants {@link Parameters#PRIORITY_HIGH}, {@link Parameters#PRIORITY_LOW},
* and {@link Parameters#PRIORITY_DEFAULT}. Defaults to {@link Parameters#PRIORITY_DEFAULT}.
*
* Priority determines the order jobs are run. In general, higher priority jobs run first. When deciding which job to run within a queue, we will always
* run the oldest job that has the highest priority. For example, if the highest priority in the queue is {@link Parameters#PRIORITY_DEFAULT}, then we'll
* run the oldest job with that priority, ignoring lower-priority jobs.
*
* Given all of the jobs that are eligible in each queue, we will do the same sort again to determine which job to run next. We will run the oldest job
* that has the highest priority among those eligible to be run.
*
* This creates the property that the only time a low-priority job will be run is if all other higher-priority jobs have been run already. Be considerate
* of this, as it provides the potential for lower-priority jobs to be extremely delayed if higher-priority jobs are being consistently enqueued at the
* same time.
*/
public @NonNull Builder setPriority(int priority) {
this.priority = priority;
return this;
}
/**
* Sets the input data that will be made available to the job when it is run.
* Should only be set by {@link JobController}.
@@ -491,7 +525,7 @@ public abstract class Job {
}
public @NonNull Parameters build() {
return new Parameters(id, createTime, lifespan, maxAttempts, maxInstancesForFactory, maxInstancesForQueue, queue, constraintKeys, inputData, memoryOnly);
return new Parameters(id, createTime, lifespan, maxAttempts, maxInstancesForFactory, maxInstancesForQueue, queue, constraintKeys, inputData, memoryOnly, priority);
}
}
}

View File

@@ -450,7 +450,8 @@ class JobController {
job.serialize(),
null,
false,
job.getParameters().isMemoryOnly());
job.getParameters().isMemoryOnly(),
job.getParameters().getPriority());
List<ConstraintSpec> constraintSpecs = Stream.of(job.getParameters().getConstraintKeys())
.map(key -> new ConstraintSpec(jobSpec.getId(), key, jobSpec.isMemoryOnly()))
@@ -554,7 +555,8 @@ class JobController {
jobSpec.getSerializedData(),
inputData,
jobSpec.isRunning(),
jobSpec.isMemoryOnly());
jobSpec.isMemoryOnly(),
jobSpec.getPriority());
}
interface Callback {

View File

@@ -73,7 +73,8 @@ public class JobMigrator {
updatedJobData.getData(),
jobSpec.getSerializedInputData(),
jobSpec.isRunning(),
jobSpec.isMemoryOnly());
jobSpec.isMemoryOnly(),
jobSpec.getPriority());
iter.set(updatedJobSpec);
}

View File

@@ -13,7 +13,8 @@ data class JobSpec(
val serializedData: ByteArray?,
val serializedInputData: ByteArray?,
val isRunning: Boolean,
val isMemoryOnly: Boolean
val isMemoryOnly: Boolean,
val priority: Int
) {
fun withNextBackoffInterval(updated: Long): JobSpec {

View File

@@ -69,8 +69,11 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
it.queueKey ?: it.id
}
.map { byQueueKey: Map.Entry<String, List<JobSpec>> ->
// Find the oldest job in each queue
byQueueKey.value.minByOrNull { it.createTime }
// We want to find the next job we should run within each queue. It should be the oldest job within the group of jobs with the highest priority.
// We can get this by sorting by createTime, then taking first job in that list that has the max priority.
byQueueKey.value
.sortedBy { it.createTime }
.maxByOrNull { it.priority }
}
.filterNotNull()
.filter { job ->
@@ -80,6 +83,9 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
.filterNot { it.isRunning }
.filter { job -> job.hasEligibleRunTime(currentTime) }
.sortedBy { it.createTime }
.sortedByDescending { it.priority }
// Note: The priority sort at the end is safe because it's stable. That means that within jobs with the same priority, they will still be sorted by createTime.
}
}

View File

@@ -88,7 +88,7 @@ public class JobMigratorTest {
private static JobStorage simpleJobStorage() {
JobStorage jobStorage = mock(JobStorage.class);
when(jobStorage.getAllJobSpecs()).thenReturn(new ArrayList<>(Collections.singletonList(new JobSpec("1", "f1", null, 1, 1, 1, 1, 1, 1, null, null, false, false))));
when(jobStorage.getAllJobSpecs()).thenReturn(new ArrayList<>(Collections.singletonList(new JobSpec("1", "f1", null, 1, 1, 1, 1, 1, 1, null, null, false, false, 0))));
return jobStorage;
}

View File

@@ -335,6 +335,45 @@ class FastJobStorageTest {
jobs[0].id assertIs "1"
}
@Test
fun `getPendingJobsWithNoDependenciesInCreatedOrder - first item in queue with priority`() {
val fullSpec1 = FullSpec(jobSpec(id = "1", factoryKey = "f1", queueKey = "q", createTime = 1, priority = Job.Parameters.PRIORITY_LOW), emptyList(), emptyList())
val fullSpec2 = FullSpec(jobSpec(id = "2", factoryKey = "f2", queueKey = "q", createTime = 2, priority = Job.Parameters.PRIORITY_HIGH), emptyList(), emptyList())
val fullSpec3 = FullSpec(jobSpec(id = "3", factoryKey = "f3", queueKey = "q", createTime = 3, priority = Job.Parameters.PRIORITY_DEFAULT), emptyList(), emptyList())
val subject = FastJobStorage(fixedDataDatabase(listOf(fullSpec1, fullSpec2, fullSpec3)))
subject.init()
val jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(10)
jobs.size assertIs 1
jobs[0].id assertIs "2"
}
@Test
fun `getPendingJobsWithNoDependenciesInCreatedOrder - complex priority`() {
val fullSpec1 = FullSpec(jobSpec(id = "1", factoryKey = "f1", queueKey = "q1", createTime = 1, priority = Job.Parameters.PRIORITY_LOW), emptyList(), emptyList())
val fullSpec2 = FullSpec(jobSpec(id = "2", factoryKey = "f2", queueKey = "q1", createTime = 2, priority = Job.Parameters.PRIORITY_HIGH), emptyList(), emptyList())
val fullSpec3 = FullSpec(jobSpec(id = "3", factoryKey = "f3", queueKey = "q2", createTime = 3, priority = Job.Parameters.PRIORITY_DEFAULT), emptyList(), emptyList())
val fullSpec4 = FullSpec(jobSpec(id = "4", factoryKey = "f4", queueKey = "q2", createTime = 4, priority = Job.Parameters.PRIORITY_LOW), emptyList(), emptyList())
val fullSpec5 = FullSpec(jobSpec(id = "5", factoryKey = "f5", queueKey = "q3", createTime = 5, priority = Job.Parameters.PRIORITY_DEFAULT), emptyList(), emptyList())
val fullSpec6 = FullSpec(jobSpec(id = "6", factoryKey = "f6", queueKey = "q3", createTime = 6, priority = Job.Parameters.PRIORITY_HIGH), emptyList(), emptyList())
val fullSpec7 = FullSpec(jobSpec(id = "7", factoryKey = "f7", queueKey = "q4", createTime = 7, priority = Job.Parameters.PRIORITY_LOW), emptyList(), emptyList())
val fullSpec8 = FullSpec(jobSpec(id = "8", factoryKey = "f8", queueKey = null, createTime = 8, priority = Job.Parameters.PRIORITY_LOW), emptyList(), emptyList())
val fullSpec9 = FullSpec(jobSpec(id = "9", factoryKey = "f9", queueKey = null, createTime = 9, priority = Job.Parameters.PRIORITY_DEFAULT), emptyList(), emptyList())
val subject = FastJobStorage(fixedDataDatabase(listOf(fullSpec1, fullSpec2, fullSpec3, fullSpec4, fullSpec5, fullSpec6, fullSpec7, fullSpec8, fullSpec9)))
subject.init()
val jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(10)
jobs.size assertIs 6
jobs[0].id assertIs "2"
jobs[1].id assertIs "6"
jobs[2].id assertIs "3"
jobs[3].id assertIs "9"
jobs[4].id assertIs "7"
jobs[5].id assertIs "8"
}
@Test
fun `getPendingJobsWithNoDependenciesInCreatedOrder - lastRunAttemptTime in the future runs right away`() {
val currentTime = 10L
@@ -581,7 +620,8 @@ class FastJobStorageTest {
serializedData: ByteArray? = null,
serializedInputData: ByteArray? = null,
isRunning: Boolean = false,
isMemoryOnly: Boolean = false
isMemoryOnly: Boolean = false,
priority: Int = 0
): JobSpec {
return JobSpec(
id = id,
@@ -596,7 +636,8 @@ class FastJobStorageTest {
serializedData = serializedData,
serializedInputData = serializedInputData,
isRunning = isRunning,
isMemoryOnly = isMemoryOnly
isMemoryOnly = isMemoryOnly,
priority = priority
)
}
@@ -614,7 +655,8 @@ class FastJobStorageTest {
serializedData = null,
serializedInputData = null,
isRunning = false,
isMemoryOnly = false
isMemoryOnly = false,
priority = 0
)
val JOB_2 = JobSpec(
id = "id2",
@@ -629,7 +671,8 @@ class FastJobStorageTest {
serializedData = null,
serializedInputData = null,
isRunning = false,
isMemoryOnly = false
isMemoryOnly = false,
priority = 0
)
val JOB_3 = JobSpec(
id = "id3",
@@ -644,7 +687,8 @@ class FastJobStorageTest {
serializedData = null,
serializedInputData = null,
isRunning = false,
isMemoryOnly = false
isMemoryOnly = false,
priority = 0
)
val CONSTRAINT_1 = ConstraintSpec(jobSpecId = "id1", factoryKey = "f1", isMemoryOnly = false)
@@ -691,7 +735,8 @@ class FastJobStorageTest {
serializedData = null,
serializedInputData = null,
isRunning = false,
isMemoryOnly = true
isMemoryOnly = true,
priority = 0
)
val CONSTRAINT_1 = ConstraintSpec(jobSpecId = "id1", factoryKey = "f1", isMemoryOnly = true)
val FULL_SPEC_1 = FullSpec(JOB_1, listOf(CONSTRAINT_1), emptyList())
@@ -712,7 +757,8 @@ class FastJobStorageTest {
serializedData = null,
serializedInputData = null,
isRunning = false,
isMemoryOnly = false
isMemoryOnly = false,
priority = 0
)
val JOB_2 = JobSpec(
id = "id2",
@@ -727,7 +773,8 @@ class FastJobStorageTest {
serializedData = null,
serializedInputData = null,
isRunning = false,
isMemoryOnly = false
isMemoryOnly = false,
priority = 0
)
val JOB_3 = JobSpec(
id = "id3",
@@ -742,7 +789,8 @@ class FastJobStorageTest {
serializedData = null,
serializedInputData = null,
isRunning = false,
isMemoryOnly = false
isMemoryOnly = false,
priority = 0
)
val DEPENDENCY_1 = DependencySpec(jobId = "id1", dependsOnJobId = "id2", isMemoryOnly = false)