Refactor ProcessScheduledJobsServiceCommand to dispose of processing jobs on shutdown

This commit is contained in:
Chris Eager
2024-09-24 13:18:28 -05:00
committed by Chris Eager
parent 946a486c4b
commit e20a4c1f77
4 changed files with 176 additions and 23 deletions

View File

@@ -136,7 +136,7 @@ public abstract class JobScheduler {
*
* @see #processJob(byte[])
*/
public CompletableFuture<Void> processAvailableJobs() {
public Mono<Void> processAvailableJobs() {
return Flux.from(dynamoDbAsyncClient.queryPaginator(QueryRequest.builder()
.tableName(tableName)
.keyConditionExpression("#schedulerName = :schedulerName AND #runAt <= :maxRunAt")
@@ -164,8 +164,7 @@ public abstract class JobScheduler {
return Mono.empty();
});
}, MAX_CONCURRENCY)
.then()
.toFuture();
.then();
}
private CompletableFuture<Void> deleteJob(final AttributeValue schedulerName, final AttributeValue runAt) {

View File

@@ -1,5 +1,6 @@
package org.whispersystems.textsecuregcm.workers;
import com.google.common.annotations.VisibleForTesting;
import io.dropwizard.core.Application;
import io.dropwizard.core.cli.ServerCommand;
import io.dropwizard.core.server.DefaultServerFactory;
@@ -7,6 +8,7 @@ import io.dropwizard.core.setup.Environment;
import io.dropwizard.jetty.HttpsConnectorFactory;
import io.dropwizard.lifecycle.Managed;
import io.dropwizard.util.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -18,6 +20,8 @@ import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.scheduler.JobScheduler;
import org.whispersystems.textsecuregcm.util.logging.UncaughtExceptionHandler;
import reactor.core.Disposable;
import reactor.core.Disposables;
public class ProcessScheduledJobsServiceCommand extends ServerCommand<WhisperServerConfiguration> {
@@ -31,7 +35,8 @@ public class ProcessScheduledJobsServiceCommand extends ServerCommand<WhisperSer
private static final Logger log = LoggerFactory.getLogger(ProcessScheduledJobsServiceCommand.class);
private static class ScheduledJobProcessor implements Managed {
@VisibleForTesting
static class ScheduledJobProcessor implements Managed {
private final JobScheduler jobScheduler;
@@ -39,8 +44,11 @@ public class ProcessScheduledJobsServiceCommand extends ServerCommand<WhisperSer
private final int fixedDelaySeconds;
private ScheduledFuture<?> processJobsFuture;
private Disposable processAvailableJobsDisposableReference = Disposables.disposed();
private boolean stopped = false;
private ScheduledJobProcessor(final JobScheduler jobScheduler,
@VisibleForTesting
ScheduledJobProcessor(final JobScheduler jobScheduler,
final ScheduledExecutorService scheduledExecutorService,
final int fixedDelaySeconds) {
@@ -52,20 +60,41 @@ public class ProcessScheduledJobsServiceCommand extends ServerCommand<WhisperSer
@Override
public void start() {
processJobsFuture = scheduledExecutorService.scheduleWithFixedDelay(() -> {
final CountDownLatch latch = new CountDownLatch(1);
synchronized (this) {
if (stopped) {
return;
}
processAvailableJobsDisposableReference = jobScheduler.processAvailableJobs()
// this CountDownLatch pattern is how Mono.block() is implemented
.doOnCancel(latch::countDown)
.doOnTerminate(latch::countDown)
.doOnError(e ->
log.warn("Failed to process available jobs for scheduler: {}", jobScheduler.getSchedulerName(), e))
.subscribe();
}
try {
jobScheduler.processAvailableJobs().join();
} catch (final Exception e) {
latch.await();
} catch (final InterruptedException e) {
log.warn("Failed to process available jobs for scheduler: {}", jobScheduler.getSchedulerName(), e);
}
}, 0, fixedDelaySeconds, TimeUnit.SECONDS);
}
@Override
public void stop() {
public synchronized void stop() {
stopped = true;
if (processJobsFuture != null) {
processJobsFuture.cancel(false);
}
processAvailableJobsDisposableReference.dispose();
processJobsFuture = null;
}
}