Change job scheduling to be relative rather than absolute.

This commit is contained in:
Greyson Parrelli
2023-08-30 09:51:08 -04:00
committed by Nicholas Tinsley
parent 64babe2e42
commit a911a007d2
10 changed files with 199 additions and 121 deletions

View File

@@ -53,7 +53,8 @@ class JobDatabase(
const val FACTORY_KEY = "factory_key"
const val QUEUE_KEY = "queue_key"
const val CREATE_TIME = "create_time"
const val NEXT_RUN_ATTEMPT_TIME = "next_run_attempt_time"
const val LAST_RUN_ATTEMPT_TIME = "last_run_attempt_time"
const val NEXT_BACKOFF_INTERVAL = "next_backoff_interval"
const val RUN_ATTEMPT = "run_attempt"
const val MAX_ATTEMPTS = "max_attempts"
const val LIFESPAN = "lifespan"
@@ -69,13 +70,14 @@ class JobDatabase(
$FACTORY_KEY TEXT,
$QUEUE_KEY TEXT,
$CREATE_TIME INTEGER,
$NEXT_RUN_ATTEMPT_TIME INTEGER,
$LAST_RUN_ATTEMPT_TIME INTEGER,
$RUN_ATTEMPT INTEGER,
$MAX_ATTEMPTS INTEGER,
$LIFESPAN INTEGER,
$SERIALIZED_DATA TEXT,
$SERIALIZED_INPUT_DATA TEXT DEFAULT NULL,
$IS_RUNNING INTEGER
$IS_RUNNING INTEGER,
$NEXT_BACKOFF_INTERVAL INTEGER
)
""".trimIndent()
}
@@ -139,6 +141,12 @@ class JobDatabase(
override fun onUpgrade(db: SQLiteDatabase, oldVersion: Int, newVersion: Int) {
Log.i(TAG, "onUpgrade($oldVersion, $newVersion)")
if (oldVersion < 2) {
db.execSQL("ALTER TABLE job_spec RENAME COLUMN next_run_attempt_time TO last_run_attempt_time")
db.execSQL("ALTER TABLE job_spec ADD COLUMN next_backoff_interval INTEGER")
db.execSQL("UPDATE job_spec SET last_run_attempt_time = 0")
}
}
override fun onOpen(db: SQLiteDatabase) {
@@ -178,22 +186,25 @@ class JobDatabase(
}
@Synchronized
fun updateJobRunningState(id: String, isRunning: Boolean) {
fun markJobAsRunning(id: String, currentTime: Long) {
writableDatabase
.update(Jobs.TABLE_NAME)
.values(Jobs.IS_RUNNING to if (isRunning) 1 else 0)
.values(
Jobs.IS_RUNNING to 1,
Jobs.LAST_RUN_ATTEMPT_TIME to currentTime
)
.where("${Jobs.JOB_SPEC_ID} = ?", id)
.run()
}
@Synchronized
fun updateJobAfterRetry(id: String, isRunning: Boolean, runAttempt: Int, nextRunAttemptTime: Long, serializedData: ByteArray?) {
fun updateJobAfterRetry(id: String, isRunning: Boolean, runAttempt: Int, nextBackoffInterval: Long, serializedData: ByteArray?) {
writableDatabase
.update(Jobs.TABLE_NAME)
.values(
Jobs.IS_RUNNING to if (isRunning) 1 else 0,
Jobs.RUN_ATTEMPT to runAttempt,
Jobs.NEXT_RUN_ATTEMPT_TIME to nextRunAttemptTime,
Jobs.NEXT_BACKOFF_INTERVAL to nextBackoffInterval,
Jobs.SERIALIZED_DATA to serializedData
)
.where("${Jobs.JOB_SPEC_ID} = ?", id)
@@ -224,7 +235,8 @@ class JobDatabase(
Jobs.FACTORY_KEY to job.factoryKey,
Jobs.QUEUE_KEY to job.queueKey,
Jobs.CREATE_TIME to job.createTime,
Jobs.NEXT_RUN_ATTEMPT_TIME to job.nextRunAttemptTime,
Jobs.LAST_RUN_ATTEMPT_TIME to job.lastRunAttemptTime,
Jobs.NEXT_BACKOFF_INTERVAL to job.nextBackoffInterval,
Jobs.RUN_ATTEMPT to job.runAttempt,
Jobs.MAX_ATTEMPTS to job.maxAttempts,
Jobs.LIFESPAN to job.lifespan,
@@ -296,7 +308,8 @@ class JobDatabase(
Jobs.FACTORY_KEY to job.factoryKey,
Jobs.QUEUE_KEY to job.queueKey,
Jobs.CREATE_TIME to job.createTime,
Jobs.NEXT_RUN_ATTEMPT_TIME to job.nextRunAttemptTime,
Jobs.LAST_RUN_ATTEMPT_TIME to job.lastRunAttemptTime,
Jobs.NEXT_BACKOFF_INTERVAL to job.nextBackoffInterval,
Jobs.RUN_ATTEMPT to job.runAttempt,
Jobs.MAX_ATTEMPTS to job.maxAttempts,
Jobs.LIFESPAN to job.lifespan,
@@ -343,7 +356,8 @@ class JobDatabase(
factoryKey = cursor.requireNonNullString(Jobs.FACTORY_KEY),
queueKey = cursor.requireString(Jobs.QUEUE_KEY),
createTime = cursor.requireLong(Jobs.CREATE_TIME),
nextRunAttemptTime = cursor.requireLong(Jobs.NEXT_RUN_ATTEMPT_TIME),
lastRunAttemptTime = cursor.requireLong(Jobs.LAST_RUN_ATTEMPT_TIME),
nextBackoffInterval = cursor.requireLong(Jobs.NEXT_BACKOFF_INTERVAL),
runAttempt = cursor.requireInt(Jobs.RUN_ATTEMPT),
maxAttempts = cursor.requireInt(Jobs.MAX_ATTEMPTS),
lifespan = cursor.requireLong(Jobs.LIFESPAN),
@@ -383,7 +397,7 @@ class JobDatabase(
companion object {
private val TAG = Log.tag(JobDatabase::class.java)
private const val DATABASE_VERSION = 1
private const val DATABASE_VERSION = 2
private const val DATABASE_NAME = "signal-jobmanager.db"
@SuppressLint("StaticFieldLeak")
@@ -411,7 +425,8 @@ class JobDatabase(
values.put(Jobs.FACTORY_KEY, CursorUtil.requireString(cursor, "factory_key"))
values.put(Jobs.QUEUE_KEY, CursorUtil.requireString(cursor, "queue_key"))
values.put(Jobs.CREATE_TIME, CursorUtil.requireLong(cursor, "create_time"))
values.put(Jobs.NEXT_RUN_ATTEMPT_TIME, CursorUtil.requireLong(cursor, "next_run_attempt_time"))
values.put(Jobs.LAST_RUN_ATTEMPT_TIME, 0)
values.put(Jobs.NEXT_BACKOFF_INTERVAL, 0)
values.put(Jobs.RUN_ATTEMPT, CursorUtil.requireInt(cursor, "run_attempt"))
values.put(Jobs.MAX_ATTEMPTS, CursorUtil.requireInt(cursor, "max_attempts"))
values.put(Jobs.LIFESPAN, CursorUtil.requireLong(cursor, "lifespan"))

View File

@@ -38,7 +38,8 @@ public abstract class Job {
private final Parameters parameters;
private int runAttempt;
private long nextRunAttemptTime;
private long lastRunAttemptTime;
private long nextBackoffInterval;
private volatile boolean canceled;
@@ -60,8 +61,12 @@ public abstract class Job {
return runAttempt;
}
public final long getNextRunAttemptTime() {
return nextRunAttemptTime;
public final long getLastRunAttemptTime() {
return lastRunAttemptTime;
}
public final long getNextBackoffInterval() {
return nextBackoffInterval;
}
public final @Nullable byte[] getInputData() {
@@ -86,8 +91,13 @@ public abstract class Job {
}
/** Should only be invoked by {@link JobController} */
final void setNextRunAttemptTime(long nextRunAttemptTime) {
this.nextRunAttemptTime = nextRunAttemptTime;
final void setLastRunAttemptTime(long lastRunAttemptTime) {
this.lastRunAttemptTime = lastRunAttemptTime;
}
/** Should only be invoked by {@link JobController} */
final void setNextBackoffInterval(long nextBackoffInterval) {
this.nextBackoffInterval = nextBackoffInterval;
}
/** Should only be invoked by {@link JobController} */

View File

@@ -250,11 +250,10 @@ class JobController {
throw new IllegalArgumentException("Invalid backoff interval! " + backoffInterval);
}
int nextRunAttempt = job.getRunAttempt() + 1;
long nextRunAttemptTime = System.currentTimeMillis() + backoffInterval;
byte[] serializedData = job.serialize();
int nextRunAttempt = job.getRunAttempt() + 1;
byte[] serializedData = job.serialize();
jobStorage.updateJobAfterRetry(job.getId(), false, nextRunAttempt, nextRunAttemptTime, serializedData);
jobStorage.updateJobAfterRetry(job.getId(), false, nextRunAttempt, backoffInterval, serializedData);
jobTracker.onStateChange(job, JobTracker.JobState.PENDING);
List<Constraint> constraints = Stream.of(jobStorage.getConstraintSpecs(job.getId()))
@@ -263,10 +262,8 @@ class JobController {
.toList();
long delay = Math.max(0, nextRunAttemptTime - System.currentTimeMillis());
Log.i(TAG, JobLogger.format(job, "Scheduling a retry in " + delay + " ms."));
scheduler.schedule(delay, constraints);
Log.i(TAG, JobLogger.format(job, "Scheduling a retry in " + backoffInterval + " ms."));
scheduler.schedule(backoffInterval, constraints);
notifyAll();
}
@@ -338,7 +335,7 @@ class JobController {
wait();
}
jobStorage.updateJobRunningState(job.getId(), true);
jobStorage.markJobAsRunning(job.getId(), System.currentTimeMillis());
runningJobs.put(job.getId(), job);
jobTracker.onStateChange(job, JobTracker.JobState.RUNNING);
@@ -445,7 +442,8 @@ class JobController {
job.getFactoryKey(),
job.getParameters().getQueue(),
System.currentTimeMillis(),
job.getNextRunAttemptTime(),
job.getLastRunAttemptTime(),
job.getNextBackoffInterval(),
job.getRunAttempt(),
job.getParameters().getMaxAttempts(),
job.getParameters().getLifespan(),
@@ -511,7 +509,8 @@ class JobController {
Job job = jobInstantiator.instantiate(jobSpec.getFactoryKey(), parameters, jobSpec.getSerializedData());
job.setRunAttempt(jobSpec.getRunAttempt());
job.setNextRunAttemptTime(jobSpec.getNextRunAttemptTime());
job.setLastRunAttemptTime(jobSpec.getLastRunAttemptTime());
job.setNextBackoffInterval(jobSpec.getNextBackoffInterval());
job.setContext(application);
return job;
@@ -547,7 +546,8 @@ class JobController {
jobSpec.getFactoryKey(),
jobSpec.getQueueKey(),
jobSpec.getCreateTime(),
jobSpec.getNextRunAttemptTime(),
jobSpec.getLastRunAttemptTime(),
jobSpec.getNextBackoffInterval(),
jobSpec.getRunAttempt(),
jobSpec.getMaxAttempts(),
jobSpec.getLifespan(),

View File

@@ -65,7 +65,8 @@ public class JobMigrator {
updatedJobData.getFactoryKey(),
updatedJobData.getQueueKey(),
jobSpec.getCreateTime(),
jobSpec.getNextRunAttemptTime(),
jobSpec.getLastRunAttemptTime(),
jobSpec.getNextBackoffInterval(),
jobSpec.getRunAttempt(),
jobSpec.getMaxAttempts(),
jobSpec.getLifespan(),

View File

@@ -5,7 +5,8 @@ data class JobSpec(
val factoryKey: String,
val queueKey: String?,
val createTime: Long,
val nextRunAttemptTime: Long,
val lastRunAttemptTime: Long,
val nextBackoffInterval: Long,
val runAttempt: Int,
val maxAttempts: Int,
val lifespan: Long,
@@ -15,8 +16,8 @@ data class JobSpec(
val isMemoryOnly: Boolean
) {
fun withNextRunAttemptTime(updated: Long): JobSpec {
return copy(nextRunAttemptTime = updated)
fun withNextBackoffInterval(updated: Long): JobSpec {
return copy(nextBackoffInterval = updated)
}
fun withData(updatedSerializedData: ByteArray?): JobSpec {
@@ -24,7 +25,7 @@ data class JobSpec(
}
override fun toString(): String {
return "id: JOB::$id | factoryKey: $factoryKey | queueKey: $queueKey | createTime: $createTime | nextRunAttemptTime: $nextRunAttemptTime | runAttempt: $runAttempt | maxAttempts: $maxAttempts | lifespan: $lifespan | isRunning: $isRunning | memoryOnly: $isMemoryOnly"
return "id: JOB::$id | factoryKey: $factoryKey | queueKey: $queueKey | createTime: $createTime | lastRunAttemptTime: $lastRunAttemptTime | nextBackoffInterval: $nextBackoffInterval | runAttempt: $runAttempt | maxAttempts: $maxAttempts | lifespan: $lifespan | isRunning: $isRunning | memoryOnly: $isMemoryOnly"
}
override fun equals(other: Any?): Boolean {
@@ -37,7 +38,8 @@ data class JobSpec(
if (factoryKey != other.factoryKey) return false
if (queueKey != other.queueKey) return false
if (createTime != other.createTime) return false
if (nextRunAttemptTime != other.nextRunAttemptTime) return false
if (lastRunAttemptTime != other.lastRunAttemptTime) return false
if (nextBackoffInterval != other.nextBackoffInterval) return false
if (runAttempt != other.runAttempt) return false
if (maxAttempts != other.maxAttempts) return false
if (lifespan != other.lifespan) return false
@@ -60,7 +62,8 @@ data class JobSpec(
result = 31 * result + factoryKey.hashCode()
result = 31 * result + (queueKey?.hashCode() ?: 0)
result = 31 * result + createTime.hashCode()
result = 31 * result + nextRunAttemptTime.hashCode()
result = 31 * result + lastRunAttemptTime.hashCode()
result = 31 * result + nextBackoffInterval.hashCode()
result = 31 * result + runAttempt
result = 31 * result + maxAttempts
result = 31 * result + lifespan.hashCode()

View File

@@ -37,7 +37,7 @@ public interface JobStorage {
boolean areQueuesEmpty(@NonNull Set<String> queueKeys);
@WorkerThread
void updateJobRunningState(@NonNull String id, boolean isRunning);
void markJobAsRunning(@NonNull String id, long currentTime);
@WorkerThread
void updateJobAfterRetry(@NonNull String id, boolean isRunning, int runAttempt, long nextRunAttemptTime, @Nullable byte[] serializedData);

View File

@@ -58,22 +58,27 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
override fun getPendingJobsWithNoDependenciesInCreatedOrder(currentTime: Long): List<JobSpec> {
val migrationJob: JobSpec? = getMigrationJob()
return if (migrationJob != null && !migrationJob.isRunning && migrationJob.nextRunAttemptTime <= currentTime) {
return if (migrationJob != null && !migrationJob.isRunning && migrationJob.hasEligibleRunTime(currentTime)) {
listOf(migrationJob)
} else if (migrationJob != null) {
emptyList()
} else {
jobs
.groupBy { it.queueKey ?: it.id }
.groupBy {
// Group together by queue. If it doesn't have a queue, we just use the ID, since it's unique and will give us all of the jobs without queues.
it.queueKey ?: it.id
}
.map { byQueueKey: Map.Entry<String, List<JobSpec>> ->
// Find the oldest job in each queue
byQueueKey.value.minByOrNull { it.createTime }
}
.filterNotNull()
.filter { job ->
// Filter out all jobs with unmet dependencies
dependenciesByJobId[job.id].isNullOrEmpty()
}
.filterNot { it.isRunning }
.filter { job -> job.nextRunAttemptTime <= currentTime }
.filter { job -> job.hasEligibleRunTime(currentTime) }
.sortedBy { it.createTime }
}
}
@@ -123,10 +128,10 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
}
@Synchronized
override fun updateJobRunningState(id: String, isRunning: Boolean) {
override fun markJobAsRunning(id: String, currentTime: Long) {
val job: JobSpec? = getJobById(id)
if (job == null || !job.isMemoryOnly) {
jobDatabase.updateJobRunningState(id, isRunning)
jobDatabase.markJobAsRunning(id, currentTime)
}
val iter = jobs.listIterator()
@@ -134,16 +139,21 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
while (iter.hasNext()) {
val current: JobSpec = iter.next()
if (current.id == id) {
iter.set(current.copy(isRunning = isRunning))
iter.set(
current.copy(
isRunning = true,
lastRunAttemptTime = currentTime
)
)
}
}
}
@Synchronized
override fun updateJobAfterRetry(id: String, isRunning: Boolean, runAttempt: Int, nextRunAttemptTime: Long, serializedData: ByteArray?) {
override fun updateJobAfterRetry(id: String, isRunning: Boolean, runAttempt: Int, nextBackoffInterval: Long, serializedData: ByteArray?) {
val job = getJobById(id)
if (job == null || !job.isMemoryOnly) {
jobDatabase.updateJobAfterRetry(id, isRunning, runAttempt, nextRunAttemptTime, serializedData)
jobDatabase.updateJobAfterRetry(id, isRunning, runAttempt, nextBackoffInterval, serializedData)
}
val iter = jobs.listIterator()
@@ -154,7 +164,7 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
current.copy(
isRunning = isRunning,
runAttempt = runAttempt,
nextRunAttemptTime = nextRunAttemptTime,
nextBackoffInterval = nextBackoffInterval,
serializedData = serializedData
)
)
@@ -304,4 +314,11 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
return dependsOnJob.createTime > job.createTime
}
/**
* Whether or not the job's eligible to be run based off of it's [Job.nextBackoffInterval] and other properties.
*/
private fun JobSpec.hasEligibleRunTime(currentTime: Long): Boolean {
return this.lastRunAttemptTime > currentTime || (this.lastRunAttemptTime + this.nextBackoffInterval) < currentTime
}
}

View File

@@ -37,9 +37,9 @@ public final class RateLimitUtil {
ApplicationDependencies.getJobManager().update((job) -> {
if (job.getFactoryKey().equals(IndividualSendJob.KEY) && messageIds.contains(IndividualSendJob.getMessageId(job.getSerializedData()))) {
return job.withNextRunAttemptTime(System.currentTimeMillis());
return job.withNextBackoffInterval(0);
} else if (job.getFactoryKey().equals(PushGroupSendJob.KEY) && messageIds.contains(PushGroupSendJob.getMessageId(job.getSerializedData()))) {
return job.withNextRunAttemptTime(System.currentTimeMillis());
return job.withNextBackoffInterval(0);
} else {
return job;
}