diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/JobDatabase.kt b/app/src/main/java/org/thoughtcrime/securesms/database/JobDatabase.kt index 158422d728..1eca5b4df6 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/JobDatabase.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/JobDatabase.kt @@ -59,6 +59,7 @@ class JobDatabase( const val SERIALIZED_DATA = "serialized_data" const val SERIALIZED_INPUT_DATA = "serialized_input_data" const val IS_RUNNING = "is_running" + const val PRIORITY = "priority" val CREATE_TABLE = """ @@ -75,7 +76,8 @@ class JobDatabase( $SERIALIZED_DATA TEXT, $SERIALIZED_INPUT_DATA TEXT DEFAULT NULL, $IS_RUNNING INTEGER, - $NEXT_BACKOFF_INTERVAL INTEGER + $NEXT_BACKOFF_INTERVAL INTEGER, + $PRIORITY INTEGER DEFAULT 0 ) """.trimIndent() } @@ -145,6 +147,10 @@ class JobDatabase( db.execSQL("ALTER TABLE job_spec ADD COLUMN next_backoff_interval INTEGER") db.execSQL("UPDATE job_spec SET last_run_attempt_time = 0") } + + if (oldVersion < 3) { + db.execSQL("ALTER TABLE job_spec ADD COLUMN priority INTEGER DEFAULT 0") + } } override fun onOpen(db: SQLiteDatabase) { @@ -189,7 +195,8 @@ class JobDatabase( Jobs.LIFESPAN, Jobs.SERIALIZED_DATA, Jobs.SERIALIZED_INPUT_DATA, - Jobs.IS_RUNNING + Jobs.IS_RUNNING, + Jobs.PRIORITY ) return readableDatabase .query(Jobs.TABLE_NAME, columns, null, null, null, null, "${Jobs.CREATE_TIME}, ${Jobs.ID} ASC") @@ -329,7 +336,8 @@ class JobDatabase( Jobs.LIFESPAN to job.lifespan, Jobs.SERIALIZED_DATA to job.serializedData, Jobs.SERIALIZED_INPUT_DATA to job.serializedInputData, - Jobs.IS_RUNNING to if (job.isRunning) 1 else 0 + Jobs.IS_RUNNING to if (job.isRunning) 1 else 0, + Jobs.PRIORITY to job.priority ) .run(SQLiteDatabase.CONFLICT_IGNORE) } @@ -378,7 +386,8 @@ class JobDatabase( serializedData = cursor.requireBlob(Jobs.SERIALIZED_DATA), serializedInputData = cursor.requireBlob(Jobs.SERIALIZED_INPUT_DATA), isRunning = cursor.requireBoolean(Jobs.IS_RUNNING), - isMemoryOnly = false + isMemoryOnly = false, + priority = cursor.requireInt(Jobs.PRIORITY) ) } @@ -416,7 +425,7 @@ class JobDatabase( companion object { private val TAG = Log.tag(JobDatabase::class.java) - private const val DATABASE_VERSION = 2 + private const val DATABASE_VERSION = 3 private const val DATABASE_NAME = "signal-jobmanager.db" @SuppressLint("StaticFieldLeak") diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/Job.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/Job.java index 6f3304c90b..f1da3970e2 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/Job.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/Job.java @@ -268,6 +268,10 @@ public abstract class Job { public static final long IMMORTAL = -1; public static final int UNLIMITED = -1; + public static final int PRIORITY_DEFAULT = 0; + public static final int PRIORITY_HIGH = 1; + public static final int PRIORITY_LOW = -1; + private final String id; private final long createTime; private final long lifespan; @@ -278,6 +282,7 @@ public abstract class Job { private final List constraintKeys; private final byte[] inputData; private final boolean memoryOnly; + private final int priority; private Parameters(@NonNull String id, long createTime, @@ -288,7 +293,8 @@ public abstract class Job { @Nullable String queue, @NonNull List constraintKeys, @Nullable byte[] inputData, - boolean memoryOnly) + boolean memoryOnly, + int priority) { this.id = id; this.createTime = createTime; @@ -300,6 +306,7 @@ public abstract class Job { this.constraintKeys = constraintKeys; this.inputData = inputData; this.memoryOnly = memoryOnly; + this.priority = priority; } @NonNull String getId() { @@ -342,8 +349,12 @@ public abstract class Job { return memoryOnly; } + int getPriority() { + return priority; + } + public Builder toBuilder() { - return new Builder(id, createTime, lifespan, maxAttempts, maxInstancesForFactory, maxInstancesForQueue, queue, constraintKeys, inputData, memoryOnly); + return new Builder(id, createTime, lifespan, maxAttempts, maxInstancesForFactory, maxInstancesForQueue, queue, constraintKeys, inputData, memoryOnly, priority); } @@ -358,13 +369,14 @@ public abstract class Job { private List constraintKeys; private byte[] inputData; private boolean memoryOnly; + private int priority; public Builder() { this(UUID.randomUUID().toString()); } Builder(@NonNull String id) { - this(id, System.currentTimeMillis(), IMMORTAL, 1, UNLIMITED, UNLIMITED, null, new LinkedList<>(), null, false); + this(id, System.currentTimeMillis(), IMMORTAL, 1, UNLIMITED, UNLIMITED, null, new LinkedList<>(), null, false, Parameters.PRIORITY_DEFAULT); } private Builder(@NonNull String id, @@ -376,7 +388,8 @@ public abstract class Job { @Nullable String queue, @NonNull List constraintKeys, @Nullable byte[] inputData, - boolean memoryOnly) + boolean memoryOnly, + int priority) { this.id = id; this.createTime = createTime; @@ -388,6 +401,7 @@ public abstract class Job { this.constraintKeys = constraintKeys; this.inputData = inputData; this.memoryOnly = memoryOnly; + this.priority = priority; } /** Should only be invoked by {@link JobController} */ @@ -481,6 +495,26 @@ public abstract class Job { return this; } + /** + * Sets the job's priority. Higher numbers are higher priority. Use the constants {@link Parameters#PRIORITY_HIGH}, {@link Parameters#PRIORITY_LOW}, + * and {@link Parameters#PRIORITY_DEFAULT}. Defaults to {@link Parameters#PRIORITY_DEFAULT}. + * + * Priority determines the order jobs are run. In general, higher priority jobs run first. When deciding which job to run within a queue, we will always + * run the oldest job that has the highest priority. For example, if the highest priority in the queue is {@link Parameters#PRIORITY_DEFAULT}, then we'll + * run the oldest job with that priority, ignoring lower-priority jobs. + * + * Given all of the jobs that are eligible in each queue, we will do the same sort again to determine which job to run next. We will run the oldest job + * that has the highest priority among those eligible to be run. + * + * This creates the property that the only time a low-priority job will be run is if all other higher-priority jobs have been run already. Be considerate + * of this, as it provides the potential for lower-priority jobs to be extremely delayed if higher-priority jobs are being consistently enqueued at the + * same time. + */ + public @NonNull Builder setPriority(int priority) { + this.priority = priority; + return this; + } + /** * Sets the input data that will be made available to the job when it is run. * Should only be set by {@link JobController}. @@ -491,7 +525,7 @@ public abstract class Job { } public @NonNull Parameters build() { - return new Parameters(id, createTime, lifespan, maxAttempts, maxInstancesForFactory, maxInstancesForQueue, queue, constraintKeys, inputData, memoryOnly); + return new Parameters(id, createTime, lifespan, maxAttempts, maxInstancesForFactory, maxInstancesForQueue, queue, constraintKeys, inputData, memoryOnly, priority); } } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java index 7f989e8be0..73a068e546 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java @@ -450,7 +450,8 @@ class JobController { job.serialize(), null, false, - job.getParameters().isMemoryOnly()); + job.getParameters().isMemoryOnly(), + job.getParameters().getPriority()); List constraintSpecs = Stream.of(job.getParameters().getConstraintKeys()) .map(key -> new ConstraintSpec(jobSpec.getId(), key, jobSpec.isMemoryOnly())) @@ -554,7 +555,8 @@ class JobController { jobSpec.getSerializedData(), inputData, jobSpec.isRunning(), - jobSpec.isMemoryOnly()); + jobSpec.isMemoryOnly(), + jobSpec.getPriority()); } interface Callback { diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobMigrator.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobMigrator.java index 42fa30155e..77d3d936f7 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobMigrator.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobMigrator.java @@ -73,7 +73,8 @@ public class JobMigrator { updatedJobData.getData(), jobSpec.getSerializedInputData(), jobSpec.isRunning(), - jobSpec.isMemoryOnly()); + jobSpec.isMemoryOnly(), + jobSpec.getPriority()); iter.set(updatedJobSpec); } diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/persistence/JobSpec.kt b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/persistence/JobSpec.kt index faf50672d6..b8049cece2 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/persistence/JobSpec.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/persistence/JobSpec.kt @@ -13,7 +13,8 @@ data class JobSpec( val serializedData: ByteArray?, val serializedInputData: ByteArray?, val isRunning: Boolean, - val isMemoryOnly: Boolean + val isMemoryOnly: Boolean, + val priority: Int ) { fun withNextBackoffInterval(updated: Long): JobSpec { diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/FastJobStorage.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/FastJobStorage.kt index ef8ffa3156..998f2ccc89 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/FastJobStorage.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/FastJobStorage.kt @@ -69,8 +69,11 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage { it.queueKey ?: it.id } .map { byQueueKey: Map.Entry> -> - // Find the oldest job in each queue - byQueueKey.value.minByOrNull { it.createTime } + // We want to find the next job we should run within each queue. It should be the oldest job within the group of jobs with the highest priority. + // We can get this by sorting by createTime, then taking first job in that list that has the max priority. + byQueueKey.value + .sortedBy { it.createTime } + .maxByOrNull { it.priority } } .filterNotNull() .filter { job -> @@ -80,6 +83,9 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage { .filterNot { it.isRunning } .filter { job -> job.hasEligibleRunTime(currentTime) } .sortedBy { it.createTime } + .sortedByDescending { it.priority } + + // Note: The priority sort at the end is safe because it's stable. That means that within jobs with the same priority, they will still be sorted by createTime. } } diff --git a/app/src/test/java/org/thoughtcrime/securesms/jobmanager/JobMigratorTest.java b/app/src/test/java/org/thoughtcrime/securesms/jobmanager/JobMigratorTest.java index 45dec578f0..b1a8f1ec30 100644 --- a/app/src/test/java/org/thoughtcrime/securesms/jobmanager/JobMigratorTest.java +++ b/app/src/test/java/org/thoughtcrime/securesms/jobmanager/JobMigratorTest.java @@ -88,7 +88,7 @@ public class JobMigratorTest { private static JobStorage simpleJobStorage() { JobStorage jobStorage = mock(JobStorage.class); - when(jobStorage.getAllJobSpecs()).thenReturn(new ArrayList<>(Collections.singletonList(new JobSpec("1", "f1", null, 1, 1, 1, 1, 1, 1, null, null, false, false)))); + when(jobStorage.getAllJobSpecs()).thenReturn(new ArrayList<>(Collections.singletonList(new JobSpec("1", "f1", null, 1, 1, 1, 1, 1, 1, null, null, false, false, 0)))); return jobStorage; } diff --git a/app/src/test/java/org/thoughtcrime/securesms/jobs/FastJobStorageTest.kt b/app/src/test/java/org/thoughtcrime/securesms/jobs/FastJobStorageTest.kt index 23c33b67b0..fc788d3d7c 100644 --- a/app/src/test/java/org/thoughtcrime/securesms/jobs/FastJobStorageTest.kt +++ b/app/src/test/java/org/thoughtcrime/securesms/jobs/FastJobStorageTest.kt @@ -335,6 +335,45 @@ class FastJobStorageTest { jobs[0].id assertIs "1" } + @Test + fun `getPendingJobsWithNoDependenciesInCreatedOrder - first item in queue with priority`() { + val fullSpec1 = FullSpec(jobSpec(id = "1", factoryKey = "f1", queueKey = "q", createTime = 1, priority = Job.Parameters.PRIORITY_LOW), emptyList(), emptyList()) + val fullSpec2 = FullSpec(jobSpec(id = "2", factoryKey = "f2", queueKey = "q", createTime = 2, priority = Job.Parameters.PRIORITY_HIGH), emptyList(), emptyList()) + val fullSpec3 = FullSpec(jobSpec(id = "3", factoryKey = "f3", queueKey = "q", createTime = 3, priority = Job.Parameters.PRIORITY_DEFAULT), emptyList(), emptyList()) + + val subject = FastJobStorage(fixedDataDatabase(listOf(fullSpec1, fullSpec2, fullSpec3))) + subject.init() + + val jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(10) + jobs.size assertIs 1 + jobs[0].id assertIs "2" + } + + @Test + fun `getPendingJobsWithNoDependenciesInCreatedOrder - complex priority`() { + val fullSpec1 = FullSpec(jobSpec(id = "1", factoryKey = "f1", queueKey = "q1", createTime = 1, priority = Job.Parameters.PRIORITY_LOW), emptyList(), emptyList()) + val fullSpec2 = FullSpec(jobSpec(id = "2", factoryKey = "f2", queueKey = "q1", createTime = 2, priority = Job.Parameters.PRIORITY_HIGH), emptyList(), emptyList()) + val fullSpec3 = FullSpec(jobSpec(id = "3", factoryKey = "f3", queueKey = "q2", createTime = 3, priority = Job.Parameters.PRIORITY_DEFAULT), emptyList(), emptyList()) + val fullSpec4 = FullSpec(jobSpec(id = "4", factoryKey = "f4", queueKey = "q2", createTime = 4, priority = Job.Parameters.PRIORITY_LOW), emptyList(), emptyList()) + val fullSpec5 = FullSpec(jobSpec(id = "5", factoryKey = "f5", queueKey = "q3", createTime = 5, priority = Job.Parameters.PRIORITY_DEFAULT), emptyList(), emptyList()) + val fullSpec6 = FullSpec(jobSpec(id = "6", factoryKey = "f6", queueKey = "q3", createTime = 6, priority = Job.Parameters.PRIORITY_HIGH), emptyList(), emptyList()) + val fullSpec7 = FullSpec(jobSpec(id = "7", factoryKey = "f7", queueKey = "q4", createTime = 7, priority = Job.Parameters.PRIORITY_LOW), emptyList(), emptyList()) + val fullSpec8 = FullSpec(jobSpec(id = "8", factoryKey = "f8", queueKey = null, createTime = 8, priority = Job.Parameters.PRIORITY_LOW), emptyList(), emptyList()) + val fullSpec9 = FullSpec(jobSpec(id = "9", factoryKey = "f9", queueKey = null, createTime = 9, priority = Job.Parameters.PRIORITY_DEFAULT), emptyList(), emptyList()) + + val subject = FastJobStorage(fixedDataDatabase(listOf(fullSpec1, fullSpec2, fullSpec3, fullSpec4, fullSpec5, fullSpec6, fullSpec7, fullSpec8, fullSpec9))) + subject.init() + + val jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(10) + jobs.size assertIs 6 + jobs[0].id assertIs "2" + jobs[1].id assertIs "6" + jobs[2].id assertIs "3" + jobs[3].id assertIs "9" + jobs[4].id assertIs "7" + jobs[5].id assertIs "8" + } + @Test fun `getPendingJobsWithNoDependenciesInCreatedOrder - lastRunAttemptTime in the future runs right away`() { val currentTime = 10L @@ -581,7 +620,8 @@ class FastJobStorageTest { serializedData: ByteArray? = null, serializedInputData: ByteArray? = null, isRunning: Boolean = false, - isMemoryOnly: Boolean = false + isMemoryOnly: Boolean = false, + priority: Int = 0 ): JobSpec { return JobSpec( id = id, @@ -596,7 +636,8 @@ class FastJobStorageTest { serializedData = serializedData, serializedInputData = serializedInputData, isRunning = isRunning, - isMemoryOnly = isMemoryOnly + isMemoryOnly = isMemoryOnly, + priority = priority ) } @@ -614,7 +655,8 @@ class FastJobStorageTest { serializedData = null, serializedInputData = null, isRunning = false, - isMemoryOnly = false + isMemoryOnly = false, + priority = 0 ) val JOB_2 = JobSpec( id = "id2", @@ -629,7 +671,8 @@ class FastJobStorageTest { serializedData = null, serializedInputData = null, isRunning = false, - isMemoryOnly = false + isMemoryOnly = false, + priority = 0 ) val JOB_3 = JobSpec( id = "id3", @@ -644,7 +687,8 @@ class FastJobStorageTest { serializedData = null, serializedInputData = null, isRunning = false, - isMemoryOnly = false + isMemoryOnly = false, + priority = 0 ) val CONSTRAINT_1 = ConstraintSpec(jobSpecId = "id1", factoryKey = "f1", isMemoryOnly = false) @@ -691,7 +735,8 @@ class FastJobStorageTest { serializedData = null, serializedInputData = null, isRunning = false, - isMemoryOnly = true + isMemoryOnly = true, + priority = 0 ) val CONSTRAINT_1 = ConstraintSpec(jobSpecId = "id1", factoryKey = "f1", isMemoryOnly = true) val FULL_SPEC_1 = FullSpec(JOB_1, listOf(CONSTRAINT_1), emptyList()) @@ -712,7 +757,8 @@ class FastJobStorageTest { serializedData = null, serializedInputData = null, isRunning = false, - isMemoryOnly = false + isMemoryOnly = false, + priority = 0 ) val JOB_2 = JobSpec( id = "id2", @@ -727,7 +773,8 @@ class FastJobStorageTest { serializedData = null, serializedInputData = null, isRunning = false, - isMemoryOnly = false + isMemoryOnly = false, + priority = 0 ) val JOB_3 = JobSpec( id = "id3", @@ -742,7 +789,8 @@ class FastJobStorageTest { serializedData = null, serializedInputData = null, isRunning = false, - isMemoryOnly = false + isMemoryOnly = false, + priority = 0 ) val DEPENDENCY_1 = DependencySpec(jobId = "id1", dependsOnJobId = "id2", isMemoryOnly = false)