Make FastJobStorage synchronous again.

This commit is contained in:
Greyson Parrelli
2020-12-03 09:58:08 -05:00
parent ebaa4cee65
commit 7868c3094b
6 changed files with 46 additions and 83 deletions

View File

@@ -145,7 +145,7 @@ public class ApplicationDependencyProvider implements ApplicationDependencies.Pr
.setJobFactories(JobManagerFactories.getJobFactories(context))
.setConstraintFactories(JobManagerFactories.getConstraintFactories(context))
.setConstraintObservers(JobManagerFactories.getConstraintObservers(context))
.setJobStorage(new FastJobStorage(DatabaseFactory.getJobDatabase(context), SignalExecutors.newCachedSingleThreadExecutor("signal-fast-job-storage")))
.setJobStorage(new FastJobStorage(DatabaseFactory.getJobDatabase(context)))
.setJobMigrator(new JobMigrator(TextSecurePreferences.getJobManagerVersion(context), JobManager.CURRENT_VERSION, JobManagerFactories.getJobMigrations(context)))
.addReservedJobRunner(new FactoryJobPredicate(PushDecryptMessageJob.KEY, PushProcessMessageJob.KEY, MarkerJob.KEY))
.addReservedJobRunner(new FactoryJobPredicate(PushTextSendJob.KEY, PushMediaSendJob.KEY, PushGroupSendJob.KEY, ReactionSendJob.KEY, TypingSendJob.KEY, GroupCallUpdateSendJob.KEY))

View File

@@ -77,11 +77,6 @@ class JobController {
notifyAll();
}
@WorkerThread
synchronized void flush() {
jobStorage.flush();
}
@WorkerThread
synchronized void submitNewJobChain(@NonNull List<List<Job>> chain) {
chain = Stream.of(chain).filterNot(List::isEmpty).toList();

View File

@@ -320,10 +320,7 @@ public class JobManager implements ConstraintObserver.Notifier {
public void flush() {
CountDownLatch latch = new CountDownLatch(1);
runOnExecutor(() -> {
jobController.flush();
latch.countDown();
});
runOnExecutor(latch::countDown);
try {
latch.await();

View File

@@ -11,9 +11,6 @@ public interface JobStorage {
@WorkerThread
void init();
@WorkerThread
void flush();
@WorkerThread
void insertJobs(@NonNull List<FullSpec> fullSpecs);

View File

@@ -35,15 +35,13 @@ public class FastJobStorage implements JobStorage {
private static final String TAG = Log.tag(FastJobStorage.class);
private final JobDatabase jobDatabase;
private final Executor serialExecutor;
private final List<JobSpec> jobs;
private final Map<String, List<ConstraintSpec>> constraintsByJobId;
private final Map<String, List<DependencySpec>> dependenciesByJobId;
public FastJobStorage(@NonNull JobDatabase jobDatabase, @NonNull Executor serialExecutor) {
public FastJobStorage(@NonNull JobDatabase jobDatabase) {
this.jobDatabase = jobDatabase;
this.serialExecutor = serialExecutor;
this.jobs = new ArrayList<>();
this.constraintsByJobId = new HashMap<>();
this.dependenciesByJobId = new HashMap<>();
@@ -70,26 +68,11 @@ public class FastJobStorage implements JobStorage {
}
}
@Override
public synchronized void flush() {
CountDownLatch latch = new CountDownLatch(1);
serialExecutor.execute(latch::countDown);
try {
latch.await();
} catch (InterruptedException e) {
Log.w(TAG, "Interrupted while waiting to flush!", e);
}
}
@Override
public synchronized void insertJobs(@NonNull List<FullSpec> fullSpecs) {
List<FullSpec> durable = Stream.of(fullSpecs).filterNot(FullSpec::isMemoryOnly).toList();
if (durable.size() > 0) {
serialExecutor.execute(() -> {
jobDatabase.insertJobs(durable);
});
jobDatabase.insertJobs(durable);
}
for (FullSpec fullSpec : fullSpecs) {
@@ -173,9 +156,7 @@ public class FastJobStorage implements JobStorage {
public synchronized void updateJobRunningState(@NonNull String id, boolean isRunning) {
JobSpec job = getJobById(id);
if (job == null || !job.isMemoryOnly()) {
serialExecutor.execute(() -> {
jobDatabase.updateJobRunningState(id, isRunning);
});
jobDatabase.updateJobRunningState(id, isRunning);
}
ListIterator<JobSpec> iter = jobs.listIterator();
@@ -206,9 +187,7 @@ public class FastJobStorage implements JobStorage {
public synchronized void updateJobAfterRetry(@NonNull String id, boolean isRunning, int runAttempt, long nextRunAttemptTime, @NonNull String serializedData) {
JobSpec job = getJobById(id);
if (job == null || !job.isMemoryOnly()) {
serialExecutor.execute(() -> {
jobDatabase.updateJobAfterRetry(id, isRunning, runAttempt, nextRunAttemptTime, serializedData);
});
jobDatabase.updateJobAfterRetry(id, isRunning, runAttempt, nextRunAttemptTime, serializedData);
}
ListIterator<JobSpec> iter = jobs.listIterator();
@@ -237,9 +216,8 @@ public class FastJobStorage implements JobStorage {
@Override
public synchronized void updateAllJobsToBePending() {
serialExecutor.execute(() -> {
jobDatabase.updateAllJobsToBePending();
});
jobDatabase.updateAllJobsToBePending();
ListIterator<JobSpec> iter = jobs.listIterator();
while (iter.hasNext()) {
@@ -273,9 +251,7 @@ public class FastJobStorage implements JobStorage {
}
if (durable.size() > 0) {
serialExecutor.execute(() -> {
jobDatabase.updateJobs(durable);
});
jobDatabase.updateJobs(durable);
}
Map<String, JobSpec> updates = Stream.of(jobSpecs).collect(Collectors.toMap(JobSpec::getId));
@@ -307,9 +283,7 @@ public class FastJobStorage implements JobStorage {
}
if (durableIds.size() > 0) {
serialExecutor.execute(() -> {
jobDatabase.deleteJobs(durableIds);
});
jobDatabase.deleteJobs(durableIds);
}
Set<String> deleteIds = new HashSet<>(jobIds);