From a911a007d279d84d69abff624ca3224915f706b9 Mon Sep 17 00:00:00 2001 From: Greyson Parrelli Date: Wed, 30 Aug 2023 09:51:08 -0400 Subject: [PATCH] Change job scheduling to be relative rather than absolute. --- .../securesms/database/JobDatabase.kt | 39 ++-- .../securesms/jobmanager/Job.java | 20 +- .../securesms/jobmanager/JobController.java | 24 +-- .../securesms/jobmanager/JobMigrator.java | 3 +- .../jobmanager/persistence/JobSpec.kt | 15 +- .../jobmanager/persistence/JobStorage.java | 2 +- .../securesms/jobs/FastJobStorage.kt | 35 +++- .../securesms/ratelimit/RateLimitUtil.java | 4 +- .../securesms/jobmanager/JobMigratorTest.java | 2 +- .../securesms/jobs/FastJobStorageTest.kt | 176 +++++++++++------- 10 files changed, 199 insertions(+), 121 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/JobDatabase.kt b/app/src/main/java/org/thoughtcrime/securesms/database/JobDatabase.kt index 9da7433943..ea4263d12b 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/JobDatabase.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/JobDatabase.kt @@ -53,7 +53,8 @@ class JobDatabase( const val FACTORY_KEY = "factory_key" const val QUEUE_KEY = "queue_key" const val CREATE_TIME = "create_time" - const val NEXT_RUN_ATTEMPT_TIME = "next_run_attempt_time" + const val LAST_RUN_ATTEMPT_TIME = "last_run_attempt_time" + const val NEXT_BACKOFF_INTERVAL = "next_backoff_interval" const val RUN_ATTEMPT = "run_attempt" const val MAX_ATTEMPTS = "max_attempts" const val LIFESPAN = "lifespan" @@ -69,13 +70,14 @@ class JobDatabase( $FACTORY_KEY TEXT, $QUEUE_KEY TEXT, $CREATE_TIME INTEGER, - $NEXT_RUN_ATTEMPT_TIME INTEGER, + $LAST_RUN_ATTEMPT_TIME INTEGER, $RUN_ATTEMPT INTEGER, $MAX_ATTEMPTS INTEGER, $LIFESPAN INTEGER, $SERIALIZED_DATA TEXT, $SERIALIZED_INPUT_DATA TEXT DEFAULT NULL, - $IS_RUNNING INTEGER + $IS_RUNNING INTEGER, + $NEXT_BACKOFF_INTERVAL INTEGER ) """.trimIndent() } @@ -139,6 +141,12 @@ class JobDatabase( override fun onUpgrade(db: SQLiteDatabase, oldVersion: Int, newVersion: Int) { Log.i(TAG, "onUpgrade($oldVersion, $newVersion)") + + if (oldVersion < 2) { + db.execSQL("ALTER TABLE job_spec RENAME COLUMN next_run_attempt_time TO last_run_attempt_time") + db.execSQL("ALTER TABLE job_spec ADD COLUMN next_backoff_interval INTEGER") + db.execSQL("UPDATE job_spec SET last_run_attempt_time = 0") + } } override fun onOpen(db: SQLiteDatabase) { @@ -178,22 +186,25 @@ class JobDatabase( } @Synchronized - fun updateJobRunningState(id: String, isRunning: Boolean) { + fun markJobAsRunning(id: String, currentTime: Long) { writableDatabase .update(Jobs.TABLE_NAME) - .values(Jobs.IS_RUNNING to if (isRunning) 1 else 0) + .values( + Jobs.IS_RUNNING to 1, + Jobs.LAST_RUN_ATTEMPT_TIME to currentTime + ) .where("${Jobs.JOB_SPEC_ID} = ?", id) .run() } @Synchronized - fun updateJobAfterRetry(id: String, isRunning: Boolean, runAttempt: Int, nextRunAttemptTime: Long, serializedData: ByteArray?) { + fun updateJobAfterRetry(id: String, isRunning: Boolean, runAttempt: Int, nextBackoffInterval: Long, serializedData: ByteArray?) { writableDatabase .update(Jobs.TABLE_NAME) .values( Jobs.IS_RUNNING to if (isRunning) 1 else 0, Jobs.RUN_ATTEMPT to runAttempt, - Jobs.NEXT_RUN_ATTEMPT_TIME to nextRunAttemptTime, + Jobs.NEXT_BACKOFF_INTERVAL to nextBackoffInterval, Jobs.SERIALIZED_DATA to serializedData ) .where("${Jobs.JOB_SPEC_ID} = ?", id) @@ -224,7 +235,8 @@ class JobDatabase( Jobs.FACTORY_KEY to job.factoryKey, Jobs.QUEUE_KEY to job.queueKey, Jobs.CREATE_TIME to job.createTime, - Jobs.NEXT_RUN_ATTEMPT_TIME to job.nextRunAttemptTime, + Jobs.LAST_RUN_ATTEMPT_TIME to job.lastRunAttemptTime, + Jobs.NEXT_BACKOFF_INTERVAL to job.nextBackoffInterval, Jobs.RUN_ATTEMPT to job.runAttempt, Jobs.MAX_ATTEMPTS to job.maxAttempts, Jobs.LIFESPAN to job.lifespan, @@ -296,7 +308,8 @@ class JobDatabase( Jobs.FACTORY_KEY to job.factoryKey, Jobs.QUEUE_KEY to job.queueKey, Jobs.CREATE_TIME to job.createTime, - Jobs.NEXT_RUN_ATTEMPT_TIME to job.nextRunAttemptTime, + Jobs.LAST_RUN_ATTEMPT_TIME to job.lastRunAttemptTime, + Jobs.NEXT_BACKOFF_INTERVAL to job.nextBackoffInterval, Jobs.RUN_ATTEMPT to job.runAttempt, Jobs.MAX_ATTEMPTS to job.maxAttempts, Jobs.LIFESPAN to job.lifespan, @@ -343,7 +356,8 @@ class JobDatabase( factoryKey = cursor.requireNonNullString(Jobs.FACTORY_KEY), queueKey = cursor.requireString(Jobs.QUEUE_KEY), createTime = cursor.requireLong(Jobs.CREATE_TIME), - nextRunAttemptTime = cursor.requireLong(Jobs.NEXT_RUN_ATTEMPT_TIME), + lastRunAttemptTime = cursor.requireLong(Jobs.LAST_RUN_ATTEMPT_TIME), + nextBackoffInterval = cursor.requireLong(Jobs.NEXT_BACKOFF_INTERVAL), runAttempt = cursor.requireInt(Jobs.RUN_ATTEMPT), maxAttempts = cursor.requireInt(Jobs.MAX_ATTEMPTS), lifespan = cursor.requireLong(Jobs.LIFESPAN), @@ -383,7 +397,7 @@ class JobDatabase( companion object { private val TAG = Log.tag(JobDatabase::class.java) - private const val DATABASE_VERSION = 1 + private const val DATABASE_VERSION = 2 private const val DATABASE_NAME = "signal-jobmanager.db" @SuppressLint("StaticFieldLeak") @@ -411,7 +425,8 @@ class JobDatabase( values.put(Jobs.FACTORY_KEY, CursorUtil.requireString(cursor, "factory_key")) values.put(Jobs.QUEUE_KEY, CursorUtil.requireString(cursor, "queue_key")) values.put(Jobs.CREATE_TIME, CursorUtil.requireLong(cursor, "create_time")) - values.put(Jobs.NEXT_RUN_ATTEMPT_TIME, CursorUtil.requireLong(cursor, "next_run_attempt_time")) + values.put(Jobs.LAST_RUN_ATTEMPT_TIME, 0) + values.put(Jobs.NEXT_BACKOFF_INTERVAL, 0) values.put(Jobs.RUN_ATTEMPT, CursorUtil.requireInt(cursor, "run_attempt")) values.put(Jobs.MAX_ATTEMPTS, CursorUtil.requireInt(cursor, "max_attempts")) values.put(Jobs.LIFESPAN, CursorUtil.requireLong(cursor, "lifespan")) diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/Job.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/Job.java index cdf38aebed..6f3304c90b 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/Job.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/Job.java @@ -38,7 +38,8 @@ public abstract class Job { private final Parameters parameters; private int runAttempt; - private long nextRunAttemptTime; + private long lastRunAttemptTime; + private long nextBackoffInterval; private volatile boolean canceled; @@ -60,8 +61,12 @@ public abstract class Job { return runAttempt; } - public final long getNextRunAttemptTime() { - return nextRunAttemptTime; + public final long getLastRunAttemptTime() { + return lastRunAttemptTime; + } + + public final long getNextBackoffInterval() { + return nextBackoffInterval; } public final @Nullable byte[] getInputData() { @@ -86,8 +91,13 @@ public abstract class Job { } /** Should only be invoked by {@link JobController} */ - final void setNextRunAttemptTime(long nextRunAttemptTime) { - this.nextRunAttemptTime = nextRunAttemptTime; + final void setLastRunAttemptTime(long lastRunAttemptTime) { + this.lastRunAttemptTime = lastRunAttemptTime; + } + + /** Should only be invoked by {@link JobController} */ + final void setNextBackoffInterval(long nextBackoffInterval) { + this.nextBackoffInterval = nextBackoffInterval; } /** Should only be invoked by {@link JobController} */ diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java index 968fc60dcf..fb2d22c69a 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java @@ -250,11 +250,10 @@ class JobController { throw new IllegalArgumentException("Invalid backoff interval! " + backoffInterval); } - int nextRunAttempt = job.getRunAttempt() + 1; - long nextRunAttemptTime = System.currentTimeMillis() + backoffInterval; - byte[] serializedData = job.serialize(); + int nextRunAttempt = job.getRunAttempt() + 1; + byte[] serializedData = job.serialize(); - jobStorage.updateJobAfterRetry(job.getId(), false, nextRunAttempt, nextRunAttemptTime, serializedData); + jobStorage.updateJobAfterRetry(job.getId(), false, nextRunAttempt, backoffInterval, serializedData); jobTracker.onStateChange(job, JobTracker.JobState.PENDING); List constraints = Stream.of(jobStorage.getConstraintSpecs(job.getId())) @@ -263,10 +262,8 @@ class JobController { .toList(); - long delay = Math.max(0, nextRunAttemptTime - System.currentTimeMillis()); - - Log.i(TAG, JobLogger.format(job, "Scheduling a retry in " + delay + " ms.")); - scheduler.schedule(delay, constraints); + Log.i(TAG, JobLogger.format(job, "Scheduling a retry in " + backoffInterval + " ms.")); + scheduler.schedule(backoffInterval, constraints); notifyAll(); } @@ -338,7 +335,7 @@ class JobController { wait(); } - jobStorage.updateJobRunningState(job.getId(), true); + jobStorage.markJobAsRunning(job.getId(), System.currentTimeMillis()); runningJobs.put(job.getId(), job); jobTracker.onStateChange(job, JobTracker.JobState.RUNNING); @@ -445,7 +442,8 @@ class JobController { job.getFactoryKey(), job.getParameters().getQueue(), System.currentTimeMillis(), - job.getNextRunAttemptTime(), + job.getLastRunAttemptTime(), + job.getNextBackoffInterval(), job.getRunAttempt(), job.getParameters().getMaxAttempts(), job.getParameters().getLifespan(), @@ -511,7 +509,8 @@ class JobController { Job job = jobInstantiator.instantiate(jobSpec.getFactoryKey(), parameters, jobSpec.getSerializedData()); job.setRunAttempt(jobSpec.getRunAttempt()); - job.setNextRunAttemptTime(jobSpec.getNextRunAttemptTime()); + job.setLastRunAttemptTime(jobSpec.getLastRunAttemptTime()); + job.setNextBackoffInterval(jobSpec.getNextBackoffInterval()); job.setContext(application); return job; @@ -547,7 +546,8 @@ class JobController { jobSpec.getFactoryKey(), jobSpec.getQueueKey(), jobSpec.getCreateTime(), - jobSpec.getNextRunAttemptTime(), + jobSpec.getLastRunAttemptTime(), + jobSpec.getNextBackoffInterval(), jobSpec.getRunAttempt(), jobSpec.getMaxAttempts(), jobSpec.getLifespan(), diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobMigrator.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobMigrator.java index dc1f82ac8a..42fa30155e 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobMigrator.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobMigrator.java @@ -65,7 +65,8 @@ public class JobMigrator { updatedJobData.getFactoryKey(), updatedJobData.getQueueKey(), jobSpec.getCreateTime(), - jobSpec.getNextRunAttemptTime(), + jobSpec.getLastRunAttemptTime(), + jobSpec.getNextBackoffInterval(), jobSpec.getRunAttempt(), jobSpec.getMaxAttempts(), jobSpec.getLifespan(), diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/persistence/JobSpec.kt b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/persistence/JobSpec.kt index 68fc43111d..faf50672d6 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/persistence/JobSpec.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/persistence/JobSpec.kt @@ -5,7 +5,8 @@ data class JobSpec( val factoryKey: String, val queueKey: String?, val createTime: Long, - val nextRunAttemptTime: Long, + val lastRunAttemptTime: Long, + val nextBackoffInterval: Long, val runAttempt: Int, val maxAttempts: Int, val lifespan: Long, @@ -15,8 +16,8 @@ data class JobSpec( val isMemoryOnly: Boolean ) { - fun withNextRunAttemptTime(updated: Long): JobSpec { - return copy(nextRunAttemptTime = updated) + fun withNextBackoffInterval(updated: Long): JobSpec { + return copy(nextBackoffInterval = updated) } fun withData(updatedSerializedData: ByteArray?): JobSpec { @@ -24,7 +25,7 @@ data class JobSpec( } override fun toString(): String { - return "id: JOB::$id | factoryKey: $factoryKey | queueKey: $queueKey | createTime: $createTime | nextRunAttemptTime: $nextRunAttemptTime | runAttempt: $runAttempt | maxAttempts: $maxAttempts | lifespan: $lifespan | isRunning: $isRunning | memoryOnly: $isMemoryOnly" + return "id: JOB::$id | factoryKey: $factoryKey | queueKey: $queueKey | createTime: $createTime | lastRunAttemptTime: $lastRunAttemptTime | nextBackoffInterval: $nextBackoffInterval | runAttempt: $runAttempt | maxAttempts: $maxAttempts | lifespan: $lifespan | isRunning: $isRunning | memoryOnly: $isMemoryOnly" } override fun equals(other: Any?): Boolean { @@ -37,7 +38,8 @@ data class JobSpec( if (factoryKey != other.factoryKey) return false if (queueKey != other.queueKey) return false if (createTime != other.createTime) return false - if (nextRunAttemptTime != other.nextRunAttemptTime) return false + if (lastRunAttemptTime != other.lastRunAttemptTime) return false + if (nextBackoffInterval != other.nextBackoffInterval) return false if (runAttempt != other.runAttempt) return false if (maxAttempts != other.maxAttempts) return false if (lifespan != other.lifespan) return false @@ -60,7 +62,8 @@ data class JobSpec( result = 31 * result + factoryKey.hashCode() result = 31 * result + (queueKey?.hashCode() ?: 0) result = 31 * result + createTime.hashCode() - result = 31 * result + nextRunAttemptTime.hashCode() + result = 31 * result + lastRunAttemptTime.hashCode() + result = 31 * result + nextBackoffInterval.hashCode() result = 31 * result + runAttempt result = 31 * result + maxAttempts result = 31 * result + lifespan.hashCode() diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/persistence/JobStorage.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/persistence/JobStorage.java index f8092d5d00..d52bcef2bf 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/persistence/JobStorage.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/persistence/JobStorage.java @@ -37,7 +37,7 @@ public interface JobStorage { boolean areQueuesEmpty(@NonNull Set queueKeys); @WorkerThread - void updateJobRunningState(@NonNull String id, boolean isRunning); + void markJobAsRunning(@NonNull String id, long currentTime); @WorkerThread void updateJobAfterRetry(@NonNull String id, boolean isRunning, int runAttempt, long nextRunAttemptTime, @Nullable byte[] serializedData); diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/FastJobStorage.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/FastJobStorage.kt index 81feb40d6a..17f29e3a3e 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/FastJobStorage.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/FastJobStorage.kt @@ -58,22 +58,27 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage { override fun getPendingJobsWithNoDependenciesInCreatedOrder(currentTime: Long): List { val migrationJob: JobSpec? = getMigrationJob() - return if (migrationJob != null && !migrationJob.isRunning && migrationJob.nextRunAttemptTime <= currentTime) { + return if (migrationJob != null && !migrationJob.isRunning && migrationJob.hasEligibleRunTime(currentTime)) { listOf(migrationJob) } else if (migrationJob != null) { emptyList() } else { jobs - .groupBy { it.queueKey ?: it.id } + .groupBy { + // Group together by queue. If it doesn't have a queue, we just use the ID, since it's unique and will give us all of the jobs without queues. + it.queueKey ?: it.id + } .map { byQueueKey: Map.Entry> -> + // Find the oldest job in each queue byQueueKey.value.minByOrNull { it.createTime } } .filterNotNull() .filter { job -> + // Filter out all jobs with unmet dependencies dependenciesByJobId[job.id].isNullOrEmpty() } .filterNot { it.isRunning } - .filter { job -> job.nextRunAttemptTime <= currentTime } + .filter { job -> job.hasEligibleRunTime(currentTime) } .sortedBy { it.createTime } } } @@ -123,10 +128,10 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage { } @Synchronized - override fun updateJobRunningState(id: String, isRunning: Boolean) { + override fun markJobAsRunning(id: String, currentTime: Long) { val job: JobSpec? = getJobById(id) if (job == null || !job.isMemoryOnly) { - jobDatabase.updateJobRunningState(id, isRunning) + jobDatabase.markJobAsRunning(id, currentTime) } val iter = jobs.listIterator() @@ -134,16 +139,21 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage { while (iter.hasNext()) { val current: JobSpec = iter.next() if (current.id == id) { - iter.set(current.copy(isRunning = isRunning)) + iter.set( + current.copy( + isRunning = true, + lastRunAttemptTime = currentTime + ) + ) } } } @Synchronized - override fun updateJobAfterRetry(id: String, isRunning: Boolean, runAttempt: Int, nextRunAttemptTime: Long, serializedData: ByteArray?) { + override fun updateJobAfterRetry(id: String, isRunning: Boolean, runAttempt: Int, nextBackoffInterval: Long, serializedData: ByteArray?) { val job = getJobById(id) if (job == null || !job.isMemoryOnly) { - jobDatabase.updateJobAfterRetry(id, isRunning, runAttempt, nextRunAttemptTime, serializedData) + jobDatabase.updateJobAfterRetry(id, isRunning, runAttempt, nextBackoffInterval, serializedData) } val iter = jobs.listIterator() @@ -154,7 +164,7 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage { current.copy( isRunning = isRunning, runAttempt = runAttempt, - nextRunAttemptTime = nextRunAttemptTime, + nextBackoffInterval = nextBackoffInterval, serializedData = serializedData ) ) @@ -304,4 +314,11 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage { return dependsOnJob.createTime > job.createTime } + + /** + * Whether or not the job's eligible to be run based off of it's [Job.nextBackoffInterval] and other properties. + */ + private fun JobSpec.hasEligibleRunTime(currentTime: Long): Boolean { + return this.lastRunAttemptTime > currentTime || (this.lastRunAttemptTime + this.nextBackoffInterval) < currentTime + } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/ratelimit/RateLimitUtil.java b/app/src/main/java/org/thoughtcrime/securesms/ratelimit/RateLimitUtil.java index 105487dfbf..56699fd56c 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/ratelimit/RateLimitUtil.java +++ b/app/src/main/java/org/thoughtcrime/securesms/ratelimit/RateLimitUtil.java @@ -37,9 +37,9 @@ public final class RateLimitUtil { ApplicationDependencies.getJobManager().update((job) -> { if (job.getFactoryKey().equals(IndividualSendJob.KEY) && messageIds.contains(IndividualSendJob.getMessageId(job.getSerializedData()))) { - return job.withNextRunAttemptTime(System.currentTimeMillis()); + return job.withNextBackoffInterval(0); } else if (job.getFactoryKey().equals(PushGroupSendJob.KEY) && messageIds.contains(PushGroupSendJob.getMessageId(job.getSerializedData()))) { - return job.withNextRunAttemptTime(System.currentTimeMillis()); + return job.withNextBackoffInterval(0); } else { return job; } diff --git a/app/src/test/java/org/thoughtcrime/securesms/jobmanager/JobMigratorTest.java b/app/src/test/java/org/thoughtcrime/securesms/jobmanager/JobMigratorTest.java index 31ac6e2a28..45dec578f0 100644 --- a/app/src/test/java/org/thoughtcrime/securesms/jobmanager/JobMigratorTest.java +++ b/app/src/test/java/org/thoughtcrime/securesms/jobmanager/JobMigratorTest.java @@ -88,7 +88,7 @@ public class JobMigratorTest { private static JobStorage simpleJobStorage() { JobStorage jobStorage = mock(JobStorage.class); - when(jobStorage.getAllJobSpecs()).thenReturn(new ArrayList<>(Collections.singletonList(new JobSpec("1", "f1", null, 1, 1, 1, 1, 1, null, null, false, false)))); + when(jobStorage.getAllJobSpecs()).thenReturn(new ArrayList<>(Collections.singletonList(new JobSpec("1", "f1", null, 1, 1, 1, 1, 1, 1, null, null, false, false)))); return jobStorage; } diff --git a/app/src/test/java/org/thoughtcrime/securesms/jobs/FastJobStorageTest.kt b/app/src/test/java/org/thoughtcrime/securesms/jobs/FastJobStorageTest.kt index 0d3b6b096e..8c918ccff4 100644 --- a/app/src/test/java/org/thoughtcrime/securesms/jobs/FastJobStorageTest.kt +++ b/app/src/test/java/org/thoughtcrime/securesms/jobs/FastJobStorageTest.kt @@ -11,11 +11,10 @@ import org.thoughtcrime.securesms.jobmanager.persistence.FullSpec import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec import org.thoughtcrime.securesms.testutil.TestHelpers import java.nio.charset.Charset -import java.util.Arrays class FastJobStorageTest { @Test - fun init_allStoredDataAvailable() { + fun `init - all stored data available`() { val subject = FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS)) subject.init() @@ -25,7 +24,7 @@ class FastJobStorageTest { } @Test - fun init_removesCircularDependencies() { + fun `init - removes circular dependencies`() { val subject = FastJobStorage(fixedDataDatabase(DataSetCircularDependency.FULL_SPECS)) subject.init() @@ -35,7 +34,7 @@ class FastJobStorageTest { } @Test - fun insertJobs_writesToDatabase() { + fun `insertJobs - writes to database`() { val database = noopDatabase() val subject = FastJobStorage(database) @@ -45,7 +44,7 @@ class FastJobStorageTest { } @Test - fun insertJobs_memoryOnlyJob_doesNotWriteToDatabase() { + fun `insertJobs - memory-only job does not write to database`() { val database = noopDatabase() val subject = FastJobStorage(database) @@ -55,7 +54,7 @@ class FastJobStorageTest { } @Test - fun insertJobs_dataCanBeFound() { + fun `insertJobs - data can be found`() { val subject = FastJobStorage(noopDatabase()) subject.insertJobs(DataSet1.FULL_SPECS) DataSet1.assertJobsMatch(subject.allJobSpecs) @@ -64,7 +63,7 @@ class FastJobStorageTest { } @Test - fun insertJobs_individualJobCanBeFound() { + fun `insertJobs - individual job can be found`() { val subject = FastJobStorage(noopDatabase()) subject.insertJobs(DataSet1.FULL_SPECS) @@ -73,7 +72,7 @@ class FastJobStorageTest { } @Test - fun updateAllJobsToBePending_writesToDatabase() { + fun `updateAllJobsToBePending - writes to database`() { val database = noopDatabase() val subject = FastJobStorage(database) subject.updateAllJobsToBePending() @@ -81,11 +80,11 @@ class FastJobStorageTest { } @Test - fun updateAllJobsToBePending_allArePending() { + fun `updateAllJobsToBePending - all are pending`() { val fullSpec1 = FullSpec(jobSpec(id = "1", factoryKey = "f1", isRunning = true), emptyList(), emptyList()) val fullSpec2 = FullSpec(jobSpec(id = "2", factoryKey = "f2", isRunning = true), emptyList(), emptyList()) - val subject = FastJobStorage(fixedDataDatabase(Arrays.asList(fullSpec1, fullSpec2))) + val subject = FastJobStorage(fixedDataDatabase(listOf(fullSpec1, fullSpec2))) subject.init() subject.updateAllJobsToBePending() @@ -94,7 +93,7 @@ class FastJobStorageTest { } @Test - fun updateJobs_writesToDatabase() { + fun `updateJobs - writes to database`() { val database = fixedDataDatabase(DataSet1.FULL_SPECS) val jobs = listOf(jobSpec(id = "id1", factoryKey = "f1")) @@ -106,7 +105,7 @@ class FastJobStorageTest { } @Test - fun updateJobs_memoryOnly_doesNotWriteToDatabase() { + fun `updateJobs - memory-only job does not write to database`() { val database = fixedDataDatabase(DataSetMemory.FULL_SPECS) val jobs = listOf(jobSpec(id = "id1", factoryKey = "f1")) @@ -118,7 +117,7 @@ class FastJobStorageTest { } @Test - fun updateJobs_updatesAllFields() { + fun `updateJobs - updates all fields`() { val fullSpec1 = FullSpec(jobSpec(id = "1", factoryKey = "f1"), emptyList(), emptyList()) val fullSpec2 = FullSpec(jobSpec(id = "2", factoryKey = "f2"), emptyList(), emptyList()) val fullSpec3 = FullSpec(jobSpec(id = "3", factoryKey = "f3"), emptyList(), emptyList()) @@ -128,7 +127,8 @@ class FastJobStorageTest { factoryKey = "g1", queueKey = "q1", createTime = 2, - nextRunAttemptTime = 2, + lastRunAttemptTime = 2, + nextBackoffInterval = 2, runAttempt = 2, maxAttempts = 2, lifespan = 2, @@ -142,7 +142,8 @@ class FastJobStorageTest { factoryKey = "g2", queueKey = "q2", createTime = 3, - nextRunAttemptTime = 3, + lastRunAttemptTime = 3, + nextBackoffInterval = 3, runAttempt = 3, maxAttempts = 3, lifespan = 3, @@ -162,31 +163,30 @@ class FastJobStorageTest { } @Test - fun updateJobRunningState_writesToDatabase() { + fun `markJobAsRunning - writes to database`() { val database = fixedDataDatabase(DataSet1.FULL_SPECS) val subject = FastJobStorage(database) subject.init() - subject.updateJobRunningState(id = "id1", isRunning = true) + subject.markJobAsRunning(id = "id1", currentTime = 42) - Mockito.verify(database).updateJobRunningState("id1", true) + Mockito.verify(database).markJobAsRunning(id = "id1", currentTime = 42) } @Test - fun updateJobRunningState_stateUpdated() { + fun `markJobAsRunning - state updated`() { val subject = FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS)) subject.init() - subject.updateJobRunningState(id = DataSet1.JOB_1.id, isRunning = true) - subject.getJobSpec(DataSet1.JOB_1.id)!!.isRunning assertIs true + subject.markJobAsRunning(id = DataSet1.JOB_1.id, currentTime = 42) - subject.updateJobRunningState(id = DataSet1.JOB_1.id, isRunning = false) - subject.getJobSpec(DataSet1.JOB_1.id)!!.isRunning assertIs false + subject.getJobSpec(DataSet1.JOB_1.id)!!.isRunning assertIs true + subject.getJobSpec(DataSet1.JOB_1.id)!!.lastRunAttemptTime assertIs 42 } @Test - fun updateJobAfterRetry_writesToDatabase() { + fun `updateJobAfterRetry - writes to database`() { val database = fixedDataDatabase(DataSet1.FULL_SPECS) val subject = FastJobStorage(database) @@ -196,15 +196,15 @@ class FastJobStorageTest { id = "id1", isRunning = true, runAttempt = 1, - nextRunAttemptTime = 10, + nextBackoffInterval = 10, serializedData = "a".toByteArray() ) - Mockito.verify(database).updateJobAfterRetry("id1", true, 1, 10, "a".toByteArray()) + Mockito.verify(database).updateJobAfterRetry(id = "id1", isRunning = true, runAttempt = 1, nextBackoffInterval = 10, serializedData = "a".toByteArray()) } @Test - fun updateJobAfterRetry_memoryOnly_doesNotWriteToDatabase() { + fun `updateJobAfterRetry - memory-only job does not write to database`() { val database = fixedDataDatabase(DataSetMemory.FULL_SPECS) val subject = FastJobStorage(database) @@ -214,32 +214,38 @@ class FastJobStorageTest { id = "id1", isRunning = true, runAttempt = 1, - nextRunAttemptTime = 10, + nextBackoffInterval = 10, serializedData = "a".toByteArray() ) - Mockito.verify(database, Mockito.times(0)).updateJobAfterRetry("id1", true, 1, 10, "a".toByteArray()) + Mockito.verify(database, Mockito.times(0)).updateJobAfterRetry(id = "id1", isRunning = true, runAttempt = 1, nextBackoffInterval = 10, serializedData = "a".toByteArray()) } @Test - fun updateJobAfterRetry_stateUpdated() { + fun `updateJobAfterRetry - state updated`() { val fullSpec = FullSpec(jobSpec(id = "1", factoryKey = "f1", isRunning = true), emptyList(), emptyList()) val subject = FastJobStorage(fixedDataDatabase(listOf(fullSpec))) subject.init() - subject.updateJobAfterRetry("1", false, 1, 10, "a".toByteArray()) + subject.updateJobAfterRetry( + id = "1", + isRunning = false, + runAttempt = 1, + nextBackoffInterval = 10, + serializedData = "a".toByteArray() + ) val job = subject.getJobSpec("1") check(job != null) job.isRunning assertIs false job.runAttempt assertIs 1 - job.nextRunAttemptTime assertIs 10 + job.nextBackoffInterval assertIs 10 job.serializedData!!.toString(Charset.defaultCharset()) assertIs "a" } @Test - fun getPendingJobsWithNoDependenciesInCreatedOrder_noneWhenEarlierItemInQueueInRunning() { + fun `getPendingJobsWithNoDependenciesInCreatedOrder - none when earlier item in queue is running`() { val fullSpec1 = FullSpec(jobSpec(id = "1", factoryKey = "f1", queueKey = "q", isRunning = true), emptyList(), emptyList()) val fullSpec2 = FullSpec(jobSpec(id = "2", factoryKey = "f2", queueKey = "q"), emptyList(), emptyList()) @@ -250,7 +256,7 @@ class FastJobStorageTest { } @Test - fun getPendingJobsWithNoDependenciesInCreatedOrder_noneWhenAllJobsAreRunning() { + fun `getPendingJobsWithNoDependenciesInCreatedOrder - none when all jobs are running`() { val fullSpec = FullSpec(jobSpec(id = "1", factoryKey = "f1", queueKey = "q", isRunning = true), emptyList(), emptyList()) val subject = FastJobStorage(fixedDataDatabase(listOf(fullSpec))) @@ -260,17 +266,18 @@ class FastJobStorageTest { } @Test - fun getPendingJobsWithNoDependenciesInCreatedOrder_noneWhenNextRunTimeIsAfterCurrentTime() { - val fullSpec = FullSpec(jobSpec(id = "1", factoryKey = "f1", queueKey = "q", nextRunAttemptTime = 10), emptyList(), emptyList()) + fun `getPendingJobsWithNoDependenciesInCreatedOrder - none when next run time is after current time`() { + val currentTime = 0L + val fullSpec = FullSpec(jobSpec(id = "1", factoryKey = "f1", queueKey = "q", lastRunAttemptTime = 0, nextBackoffInterval = 10), emptyList(), emptyList()) val subject = FastJobStorage(fixedDataDatabase(listOf(fullSpec))) subject.init() - subject.getPendingJobsWithNoDependenciesInCreatedOrder(0).size assertIs 0 + subject.getPendingJobsWithNoDependenciesInCreatedOrder(currentTime).size assertIs 0 } @Test - fun getPendingJobsWithNoDependenciesInCreatedOrder_noneWhenDependentOnAnotherJob() { + fun `getPendingJobsWithNoDependenciesInCreatedOrder - none when dependent on another job`() { val fullSpec1 = FullSpec(jobSpec(id = "1", factoryKey = "f1", isRunning = true), emptyList(), emptyList()) val fullSpec2 = FullSpec(jobSpec(id = "2", factoryKey = "f2"), emptyList(), listOf(DependencySpec("2", "1", false))) @@ -281,7 +288,7 @@ class FastJobStorageTest { } @Test - fun getPendingJobsWithNoDependenciesInCreatedOrder_singleEligibleJob() { + fun `getPendingJobsWithNoDependenciesInCreatedOrder - single eligible job`() { val fullSpec = FullSpec(jobSpec(id = "1", factoryKey = "f1", queueKey = "q"), emptyList(), emptyList()) val subject = FastJobStorage(fixedDataDatabase(listOf(fullSpec))) @@ -291,7 +298,7 @@ class FastJobStorageTest { } @Test - fun getPendingJobsWithNoDependenciesInCreatedOrder_multipleEligibleJobs() { + fun `getPendingJobsWithNoDependenciesInCreatedOrder - multiple eligible jobs`() { val fullSpec1 = FullSpec(jobSpec(id = "1", factoryKey = "f1"), emptyList(), emptyList()) val fullSpec2 = FullSpec(jobSpec(id = "2", factoryKey = "f2"), emptyList(), emptyList()) @@ -302,7 +309,7 @@ class FastJobStorageTest { } @Test - fun getPendingJobsWithNoDependenciesInCreatedOrder_singleEligibleJobInMixedList() { + fun `getPendingJobsWithNoDependenciesInCreatedOrder - single eligible job in mixed list`() { val fullSpec1 = FullSpec(jobSpec(id = "1", factoryKey = "f1", isRunning = true), emptyList(), emptyList()) val fullSpec2 = FullSpec(jobSpec(id = "2", factoryKey = "f2"), emptyList(), emptyList()) @@ -315,7 +322,7 @@ class FastJobStorageTest { } @Test - fun getPendingJobsWithNoDependenciesInCreatedOrder_firstItemInQueue() { + fun `getPendingJobsWithNoDependenciesInCreatedOrder - first item in queue`() { val fullSpec1 = FullSpec(jobSpec(id = "1", factoryKey = "f1", queueKey = "q"), emptyList(), emptyList()) val fullSpec2 = FullSpec(jobSpec(id = "2", factoryKey = "f2", queueKey = "q"), emptyList(), emptyList()) @@ -328,7 +335,21 @@ class FastJobStorageTest { } @Test - fun getPendingJobsWithNoDependenciesInCreatedOrder_migrationJobTakesPrecedence() { + fun `getPendingJobsWithNoDependenciesInCreatedOrder - lastRunAttemptTime in the future runs right away`() { + val currentTime = 10L + + val fullSpec1 = FullSpec(jobSpec(id = "1", factoryKey = "f1", queueKey = "q", lastRunAttemptTime = 100, nextBackoffInterval = 5), emptyList(), emptyList()) + + val subject = FastJobStorage(fixedDataDatabase(listOf(fullSpec1))) + subject.init() + + val jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(currentTime) + jobs.size assertIs 1 + jobs[0].id assertIs "1" + } + + @Test + fun `getPendingJobsWithNoDependenciesInCreatedOrder - migration job takes precedence`() { val plainSpec = FullSpec(jobSpec(id = "1", factoryKey = "f1", queueKey = "q", createTime = 0), emptyList(), emptyList()) val migrationSpec = FullSpec(jobSpec(id = "2", factoryKey = "f2", queueKey = Job.Parameters.MIGRATION_QUEUE_KEY, createTime = 5), emptyList(), emptyList()) @@ -341,7 +362,7 @@ class FastJobStorageTest { } @Test - fun getPendingJobsWithNoDependenciesInCreatedOrder_runningMigrationBlocksNormalJobs() { + fun `getPendingJobsWithNoDependenciesInCreatedOrder - running migration blocks normal jobs`() { val plainSpec = FullSpec(jobSpec(id = "1", factoryKey = "f1", queueKey = "q", createTime = 0), emptyList(), emptyList()) val migrationSpec = FullSpec(jobSpec(id = "2", factoryKey = "f2", queueKey = Job.Parameters.MIGRATION_QUEUE_KEY, createTime = 5, isRunning = true), emptyList(), emptyList()) @@ -353,7 +374,7 @@ class FastJobStorageTest { } @Test - fun getPendingJobsWithNoDependenciesInCreatedOrder_runningMigrationBlocksLaterMigrationJobs() { + fun `getPendingJobsWithNoDependenciesInCreatedOrder - running migration blocks later migration jobs`() { val migrationSpec1 = FullSpec(jobSpec(id = "1", factoryKey = "f1", queueKey = Job.Parameters.MIGRATION_QUEUE_KEY, createTime = 0, isRunning = true), emptyList(), emptyList()) val migrationSpec2 = FullSpec(jobSpec(id = "2", factoryKey = "f2", queueKey = Job.Parameters.MIGRATION_QUEUE_KEY, createTime = 5), emptyList(), emptyList()) @@ -365,7 +386,7 @@ class FastJobStorageTest { } @Test - fun getPendingJobsWithNoDependenciesInCreatedOrder_onlyReturnFirstEligibleMigrationJob() { + fun `getPendingJobsWithNoDependenciesInCreatedOrder - only return first eligible migration job`() { val migrationSpec1 = FullSpec(jobSpec(id = "1", factoryKey = "f1", queueKey = Job.Parameters.MIGRATION_QUEUE_KEY, createTime = 0), emptyList(), emptyList()) val migrationSpec2 = FullSpec(jobSpec(id = "2", factoryKey = "f2", queueKey = Job.Parameters.MIGRATION_QUEUE_KEY, createTime = 5), emptyList(), emptyList()) @@ -378,19 +399,21 @@ class FastJobStorageTest { } @Test - fun getPendingJobsWithNoDependenciesInCreatedOrder_onlyMigrationJobWithAppropriateNextRunTime() { - val migrationSpec1 = FullSpec(jobSpec(id = "1", factoryKey = "f1", queueKey = Job.Parameters.MIGRATION_QUEUE_KEY, createTime = 0, nextRunAttemptTime = 999), emptyList(), emptyList()) - val migrationSpec2 = FullSpec(jobSpec(id = "2", factoryKey = "f2", queueKey = Job.Parameters.MIGRATION_QUEUE_KEY, createTime = 5, nextRunAttemptTime = 0), emptyList(), emptyList()) + fun `getPendingJobsWithNoDependenciesInCreatedOrder - migration job that isn't scheduled to run yet blocks later migration jobs`() { + val currentTime = 10L + + val migrationSpec1 = FullSpec(jobSpec(id = "1", factoryKey = "f1", queueKey = Job.Parameters.MIGRATION_QUEUE_KEY, createTime = 0, lastRunAttemptTime = 0, nextBackoffInterval = 999), emptyList(), emptyList()) + val migrationSpec2 = FullSpec(jobSpec(id = "2", factoryKey = "f2", queueKey = Job.Parameters.MIGRATION_QUEUE_KEY, createTime = 5, lastRunAttemptTime = 0, nextBackoffInterval = 0), emptyList(), emptyList()) val subject = FastJobStorage(fixedDataDatabase(listOf(migrationSpec1, migrationSpec2))) subject.init() - val jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(10) + val jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(currentTime) jobs.size assertIs 0 } @Test - fun deleteJobs_writesToDatabase() { + fun `deleteJobs - writes to database`() { val database = fixedDataDatabase(DataSet1.FULL_SPECS) val ids: List = listOf("id1", "id2") @@ -403,7 +426,7 @@ class FastJobStorageTest { } @Test - fun deleteJobs_memoryOnly_doesNotWriteToDatabase() { + fun `deleteJobs - memory-only job does not write to database`() { val database = fixedDataDatabase(DataSetMemory.FULL_SPECS) val ids = listOf("id1") @@ -416,7 +439,7 @@ class FastJobStorageTest { } @Test - fun deleteJobs_deletesAllRelevantPieces() { + fun `deleteJobs - deletes all relevant pieces`() { val subject = FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS)) subject.init() @@ -435,7 +458,7 @@ class FastJobStorageTest { } @Test - fun getDependencySpecsThatDependOnJob_startOfChain() { + fun `getDependencySpecsThatDependOnJob - start of chain`() { val subject = FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS)) subject.init() @@ -446,7 +469,7 @@ class FastJobStorageTest { } @Test - fun getDependencySpecsThatDependOnJob_midChain() { + fun `getDependencySpecsThatDependOnJob - mid-chain`() { val subject = FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS)) subject.init() @@ -456,7 +479,7 @@ class FastJobStorageTest { } @Test - fun getDependencySpecsThatDependOnJob_endOfChain() { + fun `getDependencySpecsThatDependOnJob - end of chain`() { val subject = FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS)) subject.init() @@ -465,7 +488,7 @@ class FastJobStorageTest { } @Test - fun getJobsInQueue_empty() { + fun `getJobsInQueue - empty`() { val subject = FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS)) subject.init() @@ -474,7 +497,7 @@ class FastJobStorageTest { } @Test - fun getJobsInQueue_singleJob() { + fun `getJobsInQueue - single job`() { val subject = FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS)) subject.init() @@ -484,7 +507,7 @@ class FastJobStorageTest { } @Test - fun getJobCountForFactory_general() { + fun `getJobCountForFactory - general`() { val subject = FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS)) subject.init() @@ -493,7 +516,7 @@ class FastJobStorageTest { } @Test - fun getJobCountForFactoryAndQueue_general() { + fun `getJobCountForFactoryAndQueue - general`() { val subject = FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS)) subject.init() @@ -503,7 +526,7 @@ class FastJobStorageTest { } @Test - fun areQueuesEmpty_allNonEmpty() { + fun `areQueuesEmpty - all non-empty`() { val subject = FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS)) subject.init() @@ -512,7 +535,7 @@ class FastJobStorageTest { } @Test - fun areQueuesEmpty_mixedEmpty() { + fun `areQueuesEmpty - mixed empty`() { val subject = FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS)) subject.init() @@ -520,7 +543,7 @@ class FastJobStorageTest { } @Test - fun areQueuesEmpty_queueDoesNotExist() { + fun `areQueuesEmpty - queue does not exist`() { val subject = FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS)) subject.init() @@ -549,7 +572,8 @@ class FastJobStorageTest { factoryKey: String, queueKey: String? = null, createTime: Long = 1, - nextRunAttemptTime: Long = 1, + lastRunAttemptTime: Long = 1, + nextBackoffInterval: Long = 0, runAttempt: Int = 1, maxAttempts: Int = 1, lifespan: Long = 1, @@ -563,7 +587,8 @@ class FastJobStorageTest { factoryKey = factoryKey, queueKey = queueKey, createTime = createTime, - nextRunAttemptTime = nextRunAttemptTime, + lastRunAttemptTime = lastRunAttemptTime, + nextBackoffInterval = nextBackoffInterval, runAttempt = runAttempt, maxAttempts = maxAttempts, lifespan = lifespan, @@ -580,7 +605,8 @@ class FastJobStorageTest { factoryKey = "f1", queueKey = "q1", createTime = 1, - nextRunAttemptTime = 2, + lastRunAttemptTime = 2, + nextBackoffInterval = 0, runAttempt = 3, maxAttempts = 4, lifespan = 5, @@ -594,7 +620,8 @@ class FastJobStorageTest { factoryKey = "f2", queueKey = "q2", createTime = 1, - nextRunAttemptTime = 2, + lastRunAttemptTime = 2, + nextBackoffInterval = 0, runAttempt = 3, maxAttempts = 4, lifespan = 5, @@ -608,7 +635,8 @@ class FastJobStorageTest { factoryKey = "f3", queueKey = "q3", createTime = 1, - nextRunAttemptTime = 2, + lastRunAttemptTime = 2, + nextBackoffInterval = 0, runAttempt = 3, maxAttempts = 4, lifespan = 5, @@ -654,7 +682,8 @@ class FastJobStorageTest { factoryKey = "f1", queueKey = "q1", createTime = 1, - nextRunAttemptTime = 2, + lastRunAttemptTime = 2, + nextBackoffInterval = 0, runAttempt = 3, maxAttempts = 4, lifespan = 5, @@ -674,7 +703,8 @@ class FastJobStorageTest { factoryKey = "f1", queueKey = "q1", createTime = 1, - nextRunAttemptTime = 2, + lastRunAttemptTime = 2, + nextBackoffInterval = 0, runAttempt = 3, maxAttempts = 4, lifespan = 5, @@ -688,7 +718,8 @@ class FastJobStorageTest { factoryKey = "f2", queueKey = "q1", createTime = 2, - nextRunAttemptTime = 2, + lastRunAttemptTime = 2, + nextBackoffInterval = 0, runAttempt = 3, maxAttempts = 4, lifespan = 5, @@ -702,7 +733,8 @@ class FastJobStorageTest { factoryKey = "f3", queueKey = "q3", createTime = 3, - nextRunAttemptTime = 2, + lastRunAttemptTime = 2, + nextBackoffInterval = 0, runAttempt = 3, maxAttempts = 4, lifespan = 5,