Don't send a reply to clients until messages are safely in a non-volatile store.

This commit is contained in:
Jon Chambers
2020-10-09 20:58:10 -04:00
committed by Jon Chambers
parent 321e6e6679
commit bac268a21c
6 changed files with 17 additions and 187 deletions

View File

@@ -328,7 +328,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
ApnFallbackManager apnFallbackManager = new ApnFallbackManager(pushSchedulerClient, apnSender, accountsManager);
TwilioSmsSender twilioSmsSender = new TwilioSmsSender(config.getTwilioConfiguration());
SmsSender smsSender = new SmsSender(twilioSmsSender);
MessageSender messageSender = new MessageSender(apnFallbackManager, clientPresenceManager, messagesManager, gcmSender, apnSender, config.getPushConfiguration().getQueueSize(), pushLatencyManager);
MessageSender messageSender = new MessageSender(apnFallbackManager, clientPresenceManager, messagesManager, gcmSender, apnSender, pushLatencyManager);
ReceiptSender receiptSender = new ReceiptSender(accountsManager, messageSender);
TurnTokenGenerator turnTokenGenerator = new TurnTokenGenerator(config.getTurnConfiguration());
RecaptchaClient recaptchaClient = new RecaptchaClient(config.getRecaptchaConfiguration().getSecret());

View File

@@ -4,9 +4,7 @@
*/
package org.whispersystems.textsecuregcm.push;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.SharedMetricRegistries;
import com.google.common.annotations.VisibleForTesting;
import io.dropwizard.lifecycle.Managed;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import org.whispersystems.textsecuregcm.metrics.PushLatencyManager;
@@ -14,17 +12,12 @@ import org.whispersystems.textsecuregcm.redis.RedisOperation;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.util.BlockingThreadPoolExecutor;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Util;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import static com.codahale.metrics.MetricRegistry.name;
import io.dropwizard.lifecycle.Managed;
import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
/**
@@ -47,8 +40,6 @@ public class MessageSender implements Managed {
private final MessagesManager messagesManager;
private final GCMSender gcmSender;
private final APNSender apnSender;
private final ExecutorService executor;
private final int queueSize;
private final PushLatencyManager pushLatencyManager;
private static final String SEND_COUNTER_NAME = name(MessageSender.class, "sendMessage");
@@ -61,40 +52,13 @@ public class MessageSender implements Managed {
MessagesManager messagesManager,
GCMSender gcmSender,
APNSender apnSender,
int queueSize,
PushLatencyManager pushLatencyManager)
{
this(apnFallbackManager,
clientPresenceManager,
messagesManager,
gcmSender,
apnSender,
queueSize,
new BlockingThreadPoolExecutor("pushSender", 50, queueSize),
pushLatencyManager);
SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME)
.register(name(MessageSender.class, "send_queue_depth"),
(Gauge<Integer>) ((BlockingThreadPoolExecutor)executor)::getSize);
}
@VisibleForTesting
MessageSender(ApnFallbackManager apnFallbackManager,
ClientPresenceManager clientPresenceManager,
MessagesManager messagesManager,
GCMSender gcmSender,
APNSender apnSender,
int queueSize,
ExecutorService executor,
PushLatencyManager pushLatencyManager) {
this.apnFallbackManager = apnFallbackManager;
this.clientPresenceManager = clientPresenceManager;
this.messagesManager = messagesManager;
this.gcmSender = gcmSender;
this.apnSender = apnSender;
this.queueSize = queueSize;
this.executor = executor;
this.pushLatencyManager = pushLatencyManager;
}
@@ -105,15 +69,6 @@ public class MessageSender implements Managed {
throw new NotPushRegisteredException("No delivery possible!");
}
if (queueSize > 0) {
executor.execute(() -> sendSynchronousMessage(account, device, message, online));
} else {
sendSynchronousMessage(account, device, message, online);
}
}
@VisibleForTesting
void sendSynchronousMessage(Account account, Device device, Envelope message, boolean online) {
final String channel;
if (device.getGcmId() != null) {
@@ -193,10 +148,7 @@ public class MessageSender implements Managed {
}
@Override
public void stop() throws Exception {
executor.shutdown();
executor.awaitTermination(5, TimeUnit.MINUTES);
public void stop() {
apnSender.stop();
}
}

View File

@@ -158,8 +158,8 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
final byte[] ephemeralQueueKey = getEphemeralMessageQueueKey(destinationUuid, destinationDevice);
redisCluster.useBinaryCluster(connection -> {
connection.async().rpush(ephemeralQueueKey, message.toByteArray());
connection.async().expire(ephemeralQueueKey, MAX_EPHEMERAL_MESSAGE_DELAY.toSeconds());
connection.sync().rpush(ephemeralQueueKey, message.toByteArray());
connection.sync().expire(ephemeralQueueKey, MAX_EPHEMERAL_MESSAGE_DELAY.toSeconds());
});
});
}

View File

@@ -1,56 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.util;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static com.codahale.metrics.MetricRegistry.name;
public class BlockingThreadPoolExecutor extends ThreadPoolExecutor {
private final Semaphore semaphore;
private final Timer acquirePermitTimer;
public BlockingThreadPoolExecutor(String name, int threads, int bound) {
super(threads, threads, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
this.semaphore = new Semaphore(bound);
final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
this.acquirePermitTimer = metricRegistry.timer(name(getClass(), name, "acquirePermit"));
metricRegistry.gauge(name(getClass(), name, "permitsAvailable"), () -> semaphore::availablePermits);
}
@Override
public void execute(Runnable task) {
try (final Timer.Context ignored = acquirePermitTimer.time()) {
semaphore.acquireUninterruptibly();
}
try {
super.execute(task);
} catch (Throwable t) {
semaphore.release();
throw new RuntimeException(t);
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
semaphore.release();
}
public int getSize() {
return ((LinkedBlockingQueue)getQueue()).size();
}
}