Wait for outstanding requests to be resolved before shutting down the directory queue.

This commit is contained in:
Jon Chambers
2021-07-26 12:56:33 -04:00
committed by Jon Chambers
parent 34dbff6786
commit 3608c5bfb0
3 changed files with 67 additions and 1 deletions

View File

@@ -515,6 +515,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
environment.lifecycle().manage(currencyManager);
environment.lifecycle().manage(torExitNodeManager);
environment.lifecycle().manage(asnManager);
environment.lifecycle().manage(directoryQueue);
StaticCredentialsProvider cdnCredentialsProvider = StaticCredentialsProvider
.create(AwsBasicCredentials.create(

View File

@@ -11,9 +11,13 @@ import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import io.dropwizard.lifecycle.Managed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.SqsConfiguration;
@@ -28,7 +32,7 @@ import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
public class DirectoryQueue {
public class DirectoryQueue implements Managed {
private static final Logger logger = LoggerFactory.getLogger(DirectoryQueue.class);
@@ -40,6 +44,8 @@ public class DirectoryQueue {
private final List<String> queueUrls;
private final SqsAsyncClient sqs;
private final Set<SendMessageRequest> outstandingRequests = Collections.newSetFromMap(new IdentityHashMap<>());
private enum UpdateAction {
ADD("add"),
DELETE("delete");
@@ -73,6 +79,21 @@ public class DirectoryQueue {
this.sqs = sqs;
}
@Override
public void start() throws Exception {
}
@Override
public void stop() throws Exception {
synchronized (outstandingRequests) {
while (!outstandingRequests.isEmpty()) {
outstandingRequests.wait();
}
}
sqs.close();
}
public boolean isDiscoverable(final Account account) {
return account.isEnabled() && account.isDiscoverableByPhoneNumber();
}
@@ -101,6 +122,10 @@ public class DirectoryQueue {
))
.build();
synchronized (outstandingRequests) {
outstandingRequests.add(request);
}
sqs.sendMessage(request).whenComplete((response, cause) -> {
try {
if (cause instanceof SdkServiceException) {
@@ -113,6 +138,11 @@ public class DirectoryQueue {
logger.warn("sqs unexpected error", cause);
}
} finally {
synchronized (outstandingRequests) {
outstandingRequests.remove(request);
outstandingRequests.notifyAll();
}
timerContext.close();
}
});