Add support for setting max instances per job queue.

This commit is contained in:
Greyson Parrelli
2020-12-07 17:30:05 -05:00
committed by GitHub
parent dc4ce234b7
commit c3d7b88cf6
29 changed files with 136 additions and 73 deletions

View File

@@ -241,7 +241,8 @@ public abstract class Job {
private final long lifespan;
private final int maxAttempts;
private final long maxBackoff;
private final int maxInstances;
private final int maxInstancesForFactory;
private final int maxInstancesForQueue;
private final String queue;
private final List<String> constraintKeys;
private final Data inputData;
@@ -252,22 +253,24 @@ public abstract class Job {
long lifespan,
int maxAttempts,
long maxBackoff,
int maxInstances,
int maxInstancesForFactory,
int maxInstancesForQueue,
@Nullable String queue,
@NonNull List<String> constraintKeys,
@Nullable Data inputData,
boolean memoryOnly)
{
this.id = id;
this.createTime = createTime;
this.lifespan = lifespan;
this.maxAttempts = maxAttempts;
this.maxBackoff = maxBackoff;
this.maxInstances = maxInstances;
this.queue = queue;
this.constraintKeys = constraintKeys;
this.inputData = inputData;
this.memoryOnly = memoryOnly;
this.id = id;
this.createTime = createTime;
this.lifespan = lifespan;
this.maxAttempts = maxAttempts;
this.maxBackoff = maxBackoff;
this.maxInstancesForFactory = maxInstancesForFactory;
this.maxInstancesForQueue = maxInstancesForQueue;
this.queue = queue;
this.constraintKeys = constraintKeys;
this.inputData = inputData;
this.memoryOnly = memoryOnly;
}
@NonNull String getId() {
@@ -290,8 +293,12 @@ public abstract class Job {
return maxBackoff;
}
int getMaxInstances() {
return maxInstances;
int getMaxInstancesForFactory() {
return maxInstancesForFactory;
}
int getMaxInstancesForQueue() {
return maxInstancesForQueue;
}
public @Nullable String getQueue() {
@@ -311,18 +318,18 @@ public abstract class Job {
}
public Builder toBuilder() {
return new Builder(id, createTime, maxBackoff, lifespan, maxAttempts, maxInstances, queue, constraintKeys, inputData, memoryOnly);
return new Builder(id, createTime, maxBackoff, lifespan, maxAttempts, maxInstancesForFactory, maxInstancesForQueue, queue, constraintKeys, inputData, memoryOnly);
}
public static final class Builder {
private String id;
private long createTime;
private long maxBackoff;
private long lifespan;
private int maxAttempts;
private int maxInstances;
private int maxInstancesForFactory;
private int maxInstancesForQueue;
private String queue;
private List<String> constraintKeys;
private Data inputData;
@@ -333,7 +340,7 @@ public abstract class Job {
}
Builder(@NonNull String id) {
this(id, System.currentTimeMillis(), TimeUnit.SECONDS.toMillis(30), IMMORTAL, 1, UNLIMITED, null, new LinkedList<>(), null, false);
this(id, System.currentTimeMillis(), TimeUnit.SECONDS.toMillis(30), IMMORTAL, 1, UNLIMITED, UNLIMITED, null, new LinkedList<>(), null, false);
}
private Builder(@NonNull String id,
@@ -341,22 +348,24 @@ public abstract class Job {
long maxBackoff,
long lifespan,
int maxAttempts,
int maxInstances,
int maxInstancesForFactory,
int maxInstancesForQueue,
@Nullable String queue,
@NonNull List<String> constraintKeys,
@Nullable Data inputData,
boolean memoryOnly)
{
this.id = id;
this.createTime = createTime;
this.maxBackoff = maxBackoff;
this.lifespan = lifespan;
this.maxAttempts = maxAttempts;
this.maxInstances = maxInstances;
this.queue = queue;
this.constraintKeys = constraintKeys;
this.inputData = inputData;
this.memoryOnly = memoryOnly;
this.id = id;
this.createTime = createTime;
this.maxBackoff = maxBackoff;
this.lifespan = lifespan;
this.maxAttempts = maxAttempts;
this.maxInstancesForFactory = maxInstancesForFactory;
this.maxInstancesForQueue = maxInstancesForQueue;
this.queue = queue;
this.constraintKeys = constraintKeys;
this.inputData = inputData;
this.memoryOnly = memoryOnly;
}
/** Should only be invoked by {@link JobController} */
@@ -391,17 +400,31 @@ public abstract class Job {
}
/**
* Specify the maximum number of instances you'd want of this job at any given time. If
* enqueueing this job would put it over that limit, it will be ignored.
*
* Duplicates are determined by two jobs having the same {@link Job#getFactoryKey()}.
* Specify the maximum number of instances you'd want of this job at any given time, as
* determined by the job's factory key. If enqueueing this job would put it over that limit,
* it will be ignored.
*
* This property is ignored if the job is submitted as part of a {@link JobManager.Chain}.
*
* Defaults to {@link #UNLIMITED}.
*/
public @NonNull Builder setMaxInstances(int maxInstances) {
this.maxInstances = maxInstances;
public @NonNull Builder setMaxInstancesForFactory(int maxInstancesForFactory) {
this.maxInstancesForFactory = maxInstancesForFactory;
return this;
}
/**
* Specify the maximum number of instances you'd want of this job at any given time, as
* determined by the job's queue key. If enqueueing this job would put it over that limit,
* it will be ignored.
*
* This property is ignored if the job is submitted as part of a {@link JobManager.Chain}, or
* if the job has no queue key.
*
* Defaults to {@link #UNLIMITED}.
*/
public @NonNull Builder setMaxInstancesForQueue(int maxInstancesForQueue) {
this.maxInstancesForQueue = maxInstancesForQueue;
return this;
}
@@ -455,7 +478,7 @@ public abstract class Job {
}
public @NonNull Parameters build() {
return new Parameters(id, createTime, lifespan, maxAttempts, maxBackoff, maxInstances, queue, constraintKeys, inputData, memoryOnly);
return new Parameters(id, createTime, lifespan, maxAttempts, maxBackoff, maxInstancesForFactory, maxInstancesForQueue, queue, constraintKeys, inputData, memoryOnly);
}
}
}

View File

@@ -89,7 +89,7 @@ class JobController {
if (chainExceedsMaximumInstances(chain)) {
Job solo = chain.get(0).get(0);
jobTracker.onStateChange(solo, JobTracker.JobState.IGNORED);
Log.w(TAG, JobLogger.format(solo, "Already at the max instance count of " + solo.getParameters().getMaxInstances() + ". Skipping."));
Log.w(TAG, JobLogger.format(solo, "Already at the max instance count. Factory limit: " + solo.getParameters().getMaxInstancesForFactory() + ", Queue limit: " + solo.getParameters().getMaxInstancesForQueue() + ". Skipping."));
return;
}
@@ -105,7 +105,7 @@ class JobController {
if (chainExceedsMaximumInstances(chain)) {
jobTracker.onStateChange(job, JobTracker.JobState.IGNORED);
Log.w(TAG, JobLogger.format(job, "Already at the max instance count of " + job.getParameters().getMaxInstances() + ". Skipping."));
Log.w(TAG, JobLogger.format(job, "Already at the max instance count. Factory limit: " + job.getParameters().getMaxInstancesForFactory() + ", Queue limit: " + job.getParameters().getMaxInstancesForQueue() + ". Skipping."));
return;
}
@@ -299,12 +299,22 @@ class JobController {
if (chain.size() == 1 && chain.get(0).size() == 1) {
Job solo = chain.get(0).get(0);
if (solo.getParameters().getMaxInstances() != Job.Parameters.UNLIMITED &&
jobStorage.getJobInstanceCount(solo.getFactoryKey()) >= solo.getParameters().getMaxInstances())
{
boolean exceedsFactory = solo.getParameters().getMaxInstancesForFactory() != Job.Parameters.UNLIMITED &&
jobStorage.getJobCountForFactory(solo.getFactoryKey()) >= solo.getParameters().getMaxInstancesForFactory();
if (exceedsFactory) {
return true;
}
boolean exceedsQueue = solo.getParameters().getQueue() != null &&
solo.getParameters().getMaxInstancesForQueue() != Job.Parameters.UNLIMITED &&
jobStorage.getJobCountForQueue(solo.getParameters().getQueue()) >= solo.getParameters().getMaxInstancesForQueue();
if (exceedsQueue) {
return true;
}
}
return false;
}
@@ -345,7 +355,7 @@ class JobController {
job.getParameters().getMaxAttempts(),
job.getParameters().getMaxBackoff(),
job.getParameters().getLifespan(),
job.getParameters().getMaxInstances(),
job.getParameters().getMaxInstancesForFactory(),
dataSerializer.serialize(job.serialize()),
null,
false,
@@ -459,7 +469,7 @@ class JobController {
jobSpec.getMaxAttempts(),
jobSpec.getMaxBackoff(),
jobSpec.getLifespan(),
jobSpec.getMaxInstances(),
jobSpec.getMaxInstancesForFactory(),
jobSpec.getSerializedData(),
dataSerializer.serialize(inputData),
jobSpec.isRunning(),

View File

@@ -71,7 +71,7 @@ public class JobMigrator {
jobSpec.getMaxAttempts(),
jobSpec.getMaxBackoff(),
jobSpec.getLifespan(),
jobSpec.getMaxInstances(),
jobSpec.getMaxInstancesForFactory(),
dataSerializer.serialize(updatedJobData.getData()),
jobSpec.getSerializedInputData(),
jobSpec.isRunning(),

View File

@@ -87,7 +87,7 @@ public final class JobSpec {
return maxBackoff;
}
public int getMaxInstances() {
public int getMaxInstancesForFactory() {
return maxInstances;
}

View File

@@ -27,7 +27,10 @@ public interface JobStorage {
@NonNull List<JobSpec> getJobsInQueue(@NonNull String queue);
@WorkerThread
int getJobInstanceCount(@NonNull String factoryKey);
int getJobCountForFactory(@NonNull String factoryKey);
@WorkerThread
int getJobCountForQueue(@NonNull String queueKey);
@WorkerThread
void updateJobRunningState(@NonNull String id, boolean isRunning);