mirror of
https://github.com/signalapp/Signal-Android.git
synced 2026-04-23 02:10:44 +01:00
Convert FastJobStorage to kotlin.
This commit is contained in:
committed by
Alex Hart
parent
2ca4c2d1c1
commit
2a9576baf5
@@ -5,6 +5,7 @@ import android.content.ContentValues;
|
||||
import android.database.Cursor;
|
||||
|
||||
import androidx.annotation.NonNull;
|
||||
import androidx.annotation.Nullable;
|
||||
|
||||
import com.annimon.stream.Stream;
|
||||
|
||||
@@ -194,7 +195,7 @@ public class JobDatabase extends SQLiteOpenHelper implements SignalDatabaseOpenH
|
||||
getWritableDatabase().update(Jobs.TABLE_NAME, contentValues, query, args);
|
||||
}
|
||||
|
||||
public synchronized void updateJobAfterRetry(@NonNull String id, boolean isRunning, int runAttempt, long nextRunAttemptTime, @NonNull byte[] serializedData) {
|
||||
public synchronized void updateJobAfterRetry(@NonNull String id, boolean isRunning, int runAttempt, long nextRunAttemptTime, @Nullable byte[] serializedData) {
|
||||
ContentValues contentValues = new ContentValues();
|
||||
contentValues.put(Jobs.IS_RUNNING, isRunning ? 1 : 0);
|
||||
contentValues.put(Jobs.RUN_ATTEMPT, runAttempt);
|
||||
|
||||
@@ -1,395 +0,0 @@
|
||||
package org.thoughtcrime.securesms.jobs;
|
||||
|
||||
import androidx.annotation.NonNull;
|
||||
import androidx.annotation.Nullable;
|
||||
|
||||
import com.annimon.stream.Collectors;
|
||||
import com.annimon.stream.Stream;
|
||||
|
||||
import org.signal.core.util.logging.Log;
|
||||
import org.thoughtcrime.securesms.database.JobDatabase;
|
||||
import org.thoughtcrime.securesms.jobmanager.Job;
|
||||
import org.thoughtcrime.securesms.jobmanager.persistence.ConstraintSpec;
|
||||
import org.thoughtcrime.securesms.jobmanager.persistence.DependencySpec;
|
||||
import org.thoughtcrime.securesms.jobmanager.persistence.FullSpec;
|
||||
import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec;
|
||||
import org.thoughtcrime.securesms.jobmanager.persistence.JobStorage;
|
||||
import org.thoughtcrime.securesms.util.Util;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.ListIterator;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
public class FastJobStorage implements JobStorage {
|
||||
|
||||
private static final String TAG = Log.tag(FastJobStorage.class);
|
||||
|
||||
private final JobDatabase jobDatabase;
|
||||
|
||||
private final List<JobSpec> jobs;
|
||||
private final Map<String, List<ConstraintSpec>> constraintsByJobId;
|
||||
private final Map<String, List<DependencySpec>> dependenciesByJobId;
|
||||
|
||||
public FastJobStorage(@NonNull JobDatabase jobDatabase) {
|
||||
this.jobDatabase = jobDatabase;
|
||||
this.jobs = new ArrayList<>();
|
||||
this.constraintsByJobId = new HashMap<>();
|
||||
this.dependenciesByJobId = new HashMap<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void init() {
|
||||
List<JobSpec> jobSpecs = jobDatabase.getAllJobSpecs();
|
||||
List<ConstraintSpec> constraintSpecs = jobDatabase.getAllConstraintSpecs();
|
||||
List<DependencySpec> dependencySpecs = jobDatabase.getAllDependencySpecs();
|
||||
|
||||
jobs.addAll(jobSpecs);
|
||||
|
||||
for (ConstraintSpec constraintSpec: constraintSpecs) {
|
||||
List<ConstraintSpec> jobConstraints = Util.getOrDefault(constraintsByJobId, constraintSpec.getJobSpecId(), new LinkedList<>());
|
||||
jobConstraints.add(constraintSpec);
|
||||
constraintsByJobId.put(constraintSpec.getJobSpecId(), jobConstraints);
|
||||
}
|
||||
|
||||
for (DependencySpec dependencySpec : dependencySpecs) {
|
||||
List<DependencySpec> jobDependencies = Util.getOrDefault(dependenciesByJobId, dependencySpec.getJobId(), new LinkedList<>());
|
||||
jobDependencies.add(dependencySpec);
|
||||
dependenciesByJobId.put(dependencySpec.getJobId(), jobDependencies);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void insertJobs(@NonNull List<FullSpec> fullSpecs) {
|
||||
List<FullSpec> durable = Stream.of(fullSpecs).filterNot(FullSpec::isMemoryOnly).toList();
|
||||
if (durable.size() > 0) {
|
||||
jobDatabase.insertJobs(durable);
|
||||
}
|
||||
|
||||
for (FullSpec fullSpec : fullSpecs) {
|
||||
jobs.add(fullSpec.getJobSpec());
|
||||
constraintsByJobId.put(fullSpec.getJobSpec().getId(), fullSpec.getConstraintSpecs());
|
||||
dependenciesByJobId.put(fullSpec.getJobSpec().getId(), fullSpec.getDependencySpecs());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized @Nullable JobSpec getJobSpec(@NonNull String id) {
|
||||
for (JobSpec jobSpec : jobs) {
|
||||
if (jobSpec.getId().equals(id)) {
|
||||
return jobSpec;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized @NonNull List<JobSpec> getAllJobSpecs() {
|
||||
return new ArrayList<>(jobs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized @NonNull List<JobSpec> getPendingJobsWithNoDependenciesInCreatedOrder(long currentTime) {
|
||||
Optional<JobSpec> migrationJob = getMigrationJob();
|
||||
|
||||
if (migrationJob.isPresent() && !migrationJob.get().isRunning() && migrationJob.get().getNextRunAttemptTime() <= currentTime) {
|
||||
return Collections.singletonList(migrationJob.get());
|
||||
} else if (migrationJob.isPresent()) {
|
||||
return Collections.emptyList();
|
||||
} else {
|
||||
return Stream.of(jobs)
|
||||
.groupBy(jobSpec -> {
|
||||
String queueKey = jobSpec.getQueueKey();
|
||||
if (queueKey != null) {
|
||||
return queueKey;
|
||||
} else {
|
||||
return jobSpec.getId();
|
||||
}
|
||||
})
|
||||
.map(byQueueKey ->
|
||||
Stream.of(byQueueKey.getValue()).sorted((j1, j2) -> Long.compare(j1.getCreateTime(), j2.getCreateTime()))
|
||||
.findFirst()
|
||||
.orElse(null)
|
||||
)
|
||||
.withoutNulls()
|
||||
.filter(j -> {
|
||||
List<DependencySpec> dependencies = dependenciesByJobId.get(j.getId());
|
||||
return dependencies == null || dependencies.isEmpty();
|
||||
})
|
||||
.filterNot(JobSpec::isRunning)
|
||||
.filter(j -> j.getNextRunAttemptTime() <= currentTime)
|
||||
.sorted((j1, j2) -> Long.compare(j1.getCreateTime(), j2.getCreateTime()))
|
||||
.toList();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized @NonNull List<JobSpec> getJobsInQueue(@NonNull String queue) {
|
||||
return Stream.of(jobs)
|
||||
.filter(j -> queue.equals(j.getQueueKey()))
|
||||
.sorted((j1, j2) -> Long.compare(j1.getCreateTime(), j2.getCreateTime()))
|
||||
.toList();
|
||||
}
|
||||
|
||||
private Optional<JobSpec> getMigrationJob() {
|
||||
return Optional.ofNullable(Stream.of(jobs)
|
||||
.filter(j -> Job.Parameters.MIGRATION_QUEUE_KEY.equals(j.getQueueKey()))
|
||||
.filter(this::firstInQueue)
|
||||
.findFirst()
|
||||
.orElse(null));
|
||||
}
|
||||
|
||||
private boolean firstInQueue(@NonNull JobSpec job) {
|
||||
if (job.getQueueKey() == null) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return Stream.of(jobs)
|
||||
.filter(j -> Util.equals(j.getQueueKey(), job.getQueueKey()))
|
||||
.sorted((j1, j2) -> Long.compare(j1.getCreateTime(), j2.getCreateTime()))
|
||||
.toList()
|
||||
.get(0)
|
||||
.equals(job);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized int getJobCountForFactory(@NonNull String factoryKey) {
|
||||
return (int) Stream.of(jobs)
|
||||
.filter(j -> j.getFactoryKey().equals(factoryKey))
|
||||
.count();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized int getJobCountForFactoryAndQueue(@NonNull String factoryKey, @NonNull String queueKey) {
|
||||
return (int) Stream.of(jobs)
|
||||
.filter(j -> factoryKey.equals(j.getFactoryKey()) &&
|
||||
queueKey.equals(j.getQueueKey()))
|
||||
.count();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean areQueuesEmpty(@NonNull Set<String> queueKeys) {
|
||||
return Stream.of(jobs)
|
||||
.noneMatch(j -> j.getQueueKey() != null && queueKeys.contains(j.getQueueKey()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void updateJobRunningState(@NonNull String id, boolean isRunning) {
|
||||
JobSpec job = getJobById(id);
|
||||
if (job == null || !job.isMemoryOnly()) {
|
||||
jobDatabase.updateJobRunningState(id, isRunning);
|
||||
}
|
||||
|
||||
ListIterator<JobSpec> iter = jobs.listIterator();
|
||||
|
||||
while (iter.hasNext()) {
|
||||
JobSpec existing = iter.next();
|
||||
if (existing.getId().equals(id)) {
|
||||
JobSpec updated = new JobSpec(existing.getId(),
|
||||
existing.getFactoryKey(),
|
||||
existing.getQueueKey(),
|
||||
existing.getCreateTime(),
|
||||
existing.getNextRunAttemptTime(),
|
||||
existing.getRunAttempt(),
|
||||
existing.getMaxAttempts(),
|
||||
existing.getLifespan(),
|
||||
existing.getSerializedData(),
|
||||
existing.getSerializedInputData(),
|
||||
isRunning,
|
||||
existing.isMemoryOnly());
|
||||
iter.set(updated);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void updateJobAfterRetry(@NonNull String id, boolean isRunning, int runAttempt, long nextRunAttemptTime, @NonNull byte[] serializedData) {
|
||||
JobSpec job = getJobById(id);
|
||||
if (job == null || !job.isMemoryOnly()) {
|
||||
jobDatabase.updateJobAfterRetry(id, isRunning, runAttempt, nextRunAttemptTime, serializedData);
|
||||
}
|
||||
|
||||
ListIterator<JobSpec> iter = jobs.listIterator();
|
||||
|
||||
while (iter.hasNext()) {
|
||||
JobSpec existing = iter.next();
|
||||
if (existing.getId().equals(id)) {
|
||||
JobSpec updated = new JobSpec(existing.getId(),
|
||||
existing.getFactoryKey(),
|
||||
existing.getQueueKey(),
|
||||
existing.getCreateTime(),
|
||||
nextRunAttemptTime,
|
||||
runAttempt,
|
||||
existing.getMaxAttempts(),
|
||||
existing.getLifespan(),
|
||||
serializedData,
|
||||
existing.getSerializedInputData(),
|
||||
isRunning,
|
||||
existing.isMemoryOnly());
|
||||
iter.set(updated);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void updateAllJobsToBePending() {
|
||||
jobDatabase.updateAllJobsToBePending();
|
||||
|
||||
ListIterator<JobSpec> iter = jobs.listIterator();
|
||||
|
||||
while (iter.hasNext()) {
|
||||
JobSpec existing = iter.next();
|
||||
JobSpec updated = new JobSpec(existing.getId(),
|
||||
existing.getFactoryKey(),
|
||||
existing.getQueueKey(),
|
||||
existing.getCreateTime(),
|
||||
existing.getNextRunAttemptTime(),
|
||||
existing.getRunAttempt(),
|
||||
existing.getMaxAttempts(),
|
||||
existing.getLifespan(),
|
||||
existing.getSerializedData(),
|
||||
existing.getSerializedInputData(),
|
||||
false,
|
||||
existing.isMemoryOnly());
|
||||
iter.set(updated);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateJobs(@NonNull List<JobSpec> jobSpecs) {
|
||||
List<JobSpec> durable = new ArrayList<>(jobSpecs.size());
|
||||
for (JobSpec update : jobSpecs) {
|
||||
JobSpec found = getJobById(update.getId());
|
||||
if (found == null || !found.isMemoryOnly()) {
|
||||
durable.add(update);
|
||||
}
|
||||
}
|
||||
|
||||
if (durable.size() > 0) {
|
||||
jobDatabase.updateJobs(durable);
|
||||
}
|
||||
|
||||
Map<String, JobSpec> updates = Stream.of(jobSpecs).collect(Collectors.toMap(JobSpec::getId));
|
||||
ListIterator<JobSpec> iter = jobs.listIterator();
|
||||
|
||||
while (iter.hasNext()) {
|
||||
JobSpec existing = iter.next();
|
||||
JobSpec update = updates.get(existing.getId());
|
||||
|
||||
if (update != null) {
|
||||
iter.set(update);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void deleteJob(@NonNull String jobId) {
|
||||
deleteJobs(Collections.singletonList(jobId));
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void deleteJobs(@NonNull List<String> jobIds) {
|
||||
List<String> durableIds = new ArrayList<>(jobIds.size());
|
||||
for (String id : jobIds) {
|
||||
JobSpec job = getJobById(id);
|
||||
if (job == null || !job.isMemoryOnly()) {
|
||||
durableIds.add(id);
|
||||
}
|
||||
}
|
||||
|
||||
if (durableIds.size() > 0) {
|
||||
jobDatabase.deleteJobs(durableIds);
|
||||
}
|
||||
|
||||
Set<String> deleteIds = new HashSet<>(jobIds);
|
||||
|
||||
Iterator<JobSpec> jobIter = jobs.iterator();
|
||||
while (jobIter.hasNext()) {
|
||||
if (deleteIds.contains(jobIter.next().getId())) {
|
||||
jobIter.remove();
|
||||
}
|
||||
}
|
||||
|
||||
for (String jobId : jobIds) {
|
||||
constraintsByJobId.remove(jobId);
|
||||
dependenciesByJobId.remove(jobId);
|
||||
|
||||
for (Map.Entry<String, List<DependencySpec>> entry : dependenciesByJobId.entrySet()) {
|
||||
Iterator<DependencySpec> depedencyIter = entry.getValue().iterator();
|
||||
|
||||
while (depedencyIter.hasNext()) {
|
||||
if (depedencyIter.next().getDependsOnJobId().equals(jobId)) {
|
||||
depedencyIter.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized @NonNull List<ConstraintSpec> getConstraintSpecs(@NonNull String jobId) {
|
||||
return Util.getOrDefault(constraintsByJobId, jobId, new LinkedList<>());
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized @NonNull List<ConstraintSpec> getAllConstraintSpecs() {
|
||||
return Stream.of(constraintsByJobId)
|
||||
.map(Map.Entry::getValue)
|
||||
.flatMap(Stream::of)
|
||||
.toList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized @NonNull List<DependencySpec> getDependencySpecsThatDependOnJob(@NonNull String jobSpecId) {
|
||||
List<DependencySpec> layer = getSingleLayerOfDependencySpecsThatDependOnJob(jobSpecId);
|
||||
List<DependencySpec> all = new ArrayList<>(layer);
|
||||
|
||||
Set<String> activeJobIds;
|
||||
|
||||
do {
|
||||
activeJobIds = Stream.of(layer).map(DependencySpec::getJobId).collect(Collectors.toSet());
|
||||
layer.clear();
|
||||
|
||||
for (String activeJobId : activeJobIds) {
|
||||
layer.addAll(getSingleLayerOfDependencySpecsThatDependOnJob(activeJobId));
|
||||
}
|
||||
|
||||
all.addAll(layer);
|
||||
} while (!layer.isEmpty());
|
||||
|
||||
return all;
|
||||
}
|
||||
|
||||
private @NonNull List<DependencySpec> getSingleLayerOfDependencySpecsThatDependOnJob(@NonNull String jobSpecId) {
|
||||
return Stream.of(dependenciesByJobId.entrySet())
|
||||
.map(Map.Entry::getValue)
|
||||
.flatMap(Stream::of)
|
||||
.filter(j -> j.getDependsOnJobId().equals(jobSpecId))
|
||||
.toList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public @NonNull List<DependencySpec> getAllDependencySpecs() {
|
||||
return Stream.of(dependenciesByJobId)
|
||||
.map(Map.Entry::getValue)
|
||||
.flatMap(Stream::of)
|
||||
.toList();
|
||||
}
|
||||
|
||||
private JobSpec getJobById(@NonNull String id) {
|
||||
for (JobSpec job : jobs) {
|
||||
if (job.getId().equals(id)) {
|
||||
return job;
|
||||
}
|
||||
}
|
||||
Log.w(TAG, "Was looking for job with ID JOB::" + id + ", but it doesn't exist in memory!");
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,278 @@
|
||||
package org.thoughtcrime.securesms.jobs
|
||||
|
||||
import org.thoughtcrime.securesms.database.JobDatabase
|
||||
import org.thoughtcrime.securesms.jobmanager.Job
|
||||
import org.thoughtcrime.securesms.jobmanager.persistence.ConstraintSpec
|
||||
import org.thoughtcrime.securesms.jobmanager.persistence.DependencySpec
|
||||
import org.thoughtcrime.securesms.jobmanager.persistence.FullSpec
|
||||
import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec
|
||||
import org.thoughtcrime.securesms.jobmanager.persistence.JobStorage
|
||||
|
||||
class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
|
||||
|
||||
private val jobs: MutableList<JobSpec> = mutableListOf()
|
||||
private val constraintsByJobId: MutableMap<String, MutableList<ConstraintSpec>> = mutableMapOf()
|
||||
private val dependenciesByJobId: MutableMap<String, MutableList<DependencySpec>> = mutableMapOf()
|
||||
|
||||
@Synchronized
|
||||
override fun init() {
|
||||
jobs += jobDatabase.allJobSpecs
|
||||
|
||||
for (constraintSpec in jobDatabase.allConstraintSpecs) {
|
||||
val jobConstraints: MutableList<ConstraintSpec> = constraintsByJobId.getOrPut(constraintSpec.jobSpecId) { mutableListOf() }
|
||||
jobConstraints += constraintSpec
|
||||
}
|
||||
|
||||
for (dependencySpec in jobDatabase.allDependencySpecs) {
|
||||
val jobDependencies: MutableList<DependencySpec> = dependenciesByJobId.getOrPut(dependencySpec.jobId) { mutableListOf() }
|
||||
jobDependencies += dependencySpec
|
||||
}
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
override fun insertJobs(fullSpecs: List<FullSpec>) {
|
||||
val durable: List<FullSpec> = fullSpecs.filterNot { it.isMemoryOnly }
|
||||
|
||||
if (durable.isNotEmpty()) {
|
||||
jobDatabase.insertJobs(durable)
|
||||
}
|
||||
|
||||
for (fullSpec in fullSpecs) {
|
||||
jobs += fullSpec.jobSpec
|
||||
constraintsByJobId[fullSpec.jobSpec.id] = fullSpec.constraintSpecs
|
||||
dependenciesByJobId[fullSpec.jobSpec.id] = fullSpec.dependencySpecs
|
||||
}
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
override fun getJobSpec(id: String): JobSpec? {
|
||||
return jobs.firstOrNull { it.id == id }
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
override fun getAllJobSpecs(): List<JobSpec> {
|
||||
return ArrayList(jobs)
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
override fun getPendingJobsWithNoDependenciesInCreatedOrder(currentTime: Long): List<JobSpec> {
|
||||
val migrationJob: JobSpec? = getMigrationJob()
|
||||
|
||||
return if (migrationJob != null && !migrationJob.isRunning && migrationJob.nextRunAttemptTime <= currentTime) {
|
||||
listOf(migrationJob)
|
||||
} else if (migrationJob != null) {
|
||||
emptyList()
|
||||
} else {
|
||||
jobs
|
||||
.groupBy { it.queueKey ?: it.id }
|
||||
.map { byQueueKey: Map.Entry<String, List<JobSpec>> ->
|
||||
byQueueKey.value.minByOrNull { it.createTime }
|
||||
}
|
||||
.filterNotNull()
|
||||
.filter { job ->
|
||||
dependenciesByJobId[job.id].isNullOrEmpty()
|
||||
}
|
||||
.filterNot { it.isRunning }
|
||||
.filter { job -> job.nextRunAttemptTime <= currentTime }
|
||||
.sortedBy { it.createTime }
|
||||
}
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
override fun getJobsInQueue(queue: String): List<JobSpec> {
|
||||
return jobs
|
||||
.filter { it.queueKey == queue }
|
||||
.sortedBy { it.createTime }
|
||||
}
|
||||
|
||||
private fun getMigrationJob(): JobSpec? {
|
||||
return jobs
|
||||
.filter { it.queueKey == Job.Parameters.MIGRATION_QUEUE_KEY }
|
||||
.filter { firstInQueue(it) }
|
||||
.firstOrNull()
|
||||
}
|
||||
|
||||
private fun firstInQueue(job: JobSpec): Boolean {
|
||||
return if (job.queueKey == null) {
|
||||
true
|
||||
} else {
|
||||
val firstInQueue: JobSpec? = jobs
|
||||
.filter { it.queueKey == job.queueKey }
|
||||
.minByOrNull { it.createTime }
|
||||
|
||||
job == firstInQueue
|
||||
}
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
override fun getJobCountForFactory(factoryKey: String): Int {
|
||||
return jobs
|
||||
.filter { it.factoryKey == factoryKey }
|
||||
.size
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
override fun getJobCountForFactoryAndQueue(factoryKey: String, queueKey: String): Int {
|
||||
return jobs
|
||||
.filter { it.factoryKey == factoryKey && it.queueKey == queueKey }
|
||||
.size
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
override fun areQueuesEmpty(queueKeys: Set<String>): Boolean {
|
||||
return jobs.none { it.queueKey != null && queueKeys.contains(it.queueKey) }
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
override fun updateJobRunningState(id: String, isRunning: Boolean) {
|
||||
val job: JobSpec? = getJobById(id)
|
||||
if (job == null || !job.isMemoryOnly) {
|
||||
jobDatabase.updateJobRunningState(id, isRunning)
|
||||
}
|
||||
|
||||
val iter = jobs.listIterator()
|
||||
|
||||
while (iter.hasNext()) {
|
||||
val current: JobSpec = iter.next()
|
||||
if (current.id == id) {
|
||||
iter.set(current.copy(isRunning = isRunning))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
override fun updateJobAfterRetry(id: String, isRunning: Boolean, runAttempt: Int, nextRunAttemptTime: Long, serializedData: ByteArray?) {
|
||||
val job = getJobById(id)
|
||||
if (job == null || !job.isMemoryOnly) {
|
||||
jobDatabase.updateJobAfterRetry(id, isRunning, runAttempt, nextRunAttemptTime, serializedData)
|
||||
}
|
||||
|
||||
val iter = jobs.listIterator()
|
||||
while (iter.hasNext()) {
|
||||
val current = iter.next()
|
||||
if (current.id == id) {
|
||||
iter.set(
|
||||
current.copy(
|
||||
isRunning = isRunning,
|
||||
runAttempt = runAttempt,
|
||||
nextRunAttemptTime = nextRunAttemptTime,
|
||||
serializedData = serializedData
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
override fun updateAllJobsToBePending() {
|
||||
jobDatabase.updateAllJobsToBePending()
|
||||
|
||||
val iter = jobs.listIterator()
|
||||
while (iter.hasNext()) {
|
||||
val current = iter.next()
|
||||
iter.set(current.copy(isRunning = false))
|
||||
}
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
override fun updateJobs(jobSpecs: List<JobSpec>) {
|
||||
val durable: List<JobSpec> = jobSpecs
|
||||
.filter { updatedJob ->
|
||||
val found = getJobById(updatedJob.id)
|
||||
found != null && !found.isMemoryOnly
|
||||
}
|
||||
|
||||
if (durable.isNotEmpty()) {
|
||||
jobDatabase.updateJobs(durable)
|
||||
}
|
||||
|
||||
val updatesById: Map<String, JobSpec> = jobSpecs.associateBy { it.id }
|
||||
|
||||
val iter = jobs.listIterator()
|
||||
while (iter.hasNext()) {
|
||||
val current = iter.next()
|
||||
val update = updatesById[current.id]
|
||||
|
||||
if (update != null) {
|
||||
iter.set(update)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
override fun deleteJob(jobId: String) {
|
||||
deleteJobs(listOf(jobId))
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
override fun deleteJobs(jobIds: List<String>) {
|
||||
val durableIds: List<String> = jobIds
|
||||
.mapNotNull { getJobById(it) }
|
||||
.filterNot { it.isMemoryOnly }
|
||||
.map { it.id }
|
||||
|
||||
if (durableIds.isNotEmpty()) {
|
||||
jobDatabase.deleteJobs(durableIds)
|
||||
}
|
||||
|
||||
val deleteIds: Set<String> = jobIds.toSet()
|
||||
jobs.removeIf { deleteIds.contains(it.id) }
|
||||
|
||||
for (jobId in jobIds) {
|
||||
constraintsByJobId.remove(jobId)
|
||||
dependenciesByJobId.remove(jobId)
|
||||
|
||||
for (dependencyList in dependenciesByJobId.values) {
|
||||
val iter = dependencyList.iterator()
|
||||
|
||||
while (iter.hasNext()) {
|
||||
if (iter.next().dependsOnJobId == jobId) {
|
||||
iter.remove()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
override fun getConstraintSpecs(jobId: String): List<ConstraintSpec> {
|
||||
return constraintsByJobId.getOrElse(jobId) { listOf() }
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
override fun getAllConstraintSpecs(): List<ConstraintSpec> {
|
||||
return constraintsByJobId.values.flatten()
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
override fun getDependencySpecsThatDependOnJob(jobSpecId: String): List<DependencySpec> {
|
||||
val all: MutableList<DependencySpec> = mutableListOf()
|
||||
|
||||
var dependencyLayer: List<DependencySpec> = getSingleLayerOfDependencySpecsThatDependOnJob(jobSpecId)
|
||||
|
||||
while (dependencyLayer.isNotEmpty()) {
|
||||
all += dependencyLayer
|
||||
|
||||
dependencyLayer = dependencyLayer
|
||||
.map { getSingleLayerOfDependencySpecsThatDependOnJob(it.jobId) }
|
||||
.flatten()
|
||||
}
|
||||
|
||||
return all
|
||||
}
|
||||
|
||||
private fun getSingleLayerOfDependencySpecsThatDependOnJob(jobSpecId: String): List<DependencySpec> {
|
||||
return dependenciesByJobId
|
||||
.values
|
||||
.flatten()
|
||||
.filter { it.dependsOnJobId == jobSpecId }
|
||||
}
|
||||
|
||||
override fun getAllDependencySpecs(): List<DependencySpec> {
|
||||
return dependenciesByJobId.values.flatten()
|
||||
}
|
||||
|
||||
private fun getJobById(id: String): JobSpec? {
|
||||
return jobs.firstOrNull { it.id == id }
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user