Collapse WebsocketSender into PushSender.

This commit is contained in:
Jon Chambers
2020-09-18 15:55:53 -04:00
committed by Jon Chambers
parent 5e30b0499a
commit 74b3daa70a
5 changed files with 239 additions and 155 deletions

View File

@@ -106,7 +106,6 @@ import org.whispersystems.textsecuregcm.push.GCMSender;
import org.whispersystems.textsecuregcm.push.ProvisioningManager;
import org.whispersystems.textsecuregcm.push.PushSender;
import org.whispersystems.textsecuregcm.push.ReceiptSender;
import org.whispersystems.textsecuregcm.push.WebsocketSender;
import org.whispersystems.textsecuregcm.recaptcha.RecaptchaClient;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
@@ -313,7 +312,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
PubSubManager pubSubManager = new PubSubManager(pubsubClient, dispatchManager);
APNSender apnSender = new APNSender(accountsManager, config.getApnConfiguration());
GCMSender gcmSender = new GCMSender(accountsManager, config.getGcmConfiguration().getApiKey());
WebsocketSender websocketSender = new WebsocketSender(messagesManager, clientPresenceManager);
RateLimiters rateLimiters = new RateLimiters(config.getLimitsConfiguration(), cacheCluster);
ProvisioningManager provisioningManager = new ProvisioningManager(pubSubManager);
@@ -331,7 +329,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
ApnFallbackManager apnFallbackManager = new ApnFallbackManager(pushSchedulerClient, apnSender, accountsManager);
TwilioSmsSender twilioSmsSender = new TwilioSmsSender(config.getTwilioConfiguration());
SmsSender smsSender = new SmsSender(twilioSmsSender);
PushSender pushSender = new PushSender(apnFallbackManager, gcmSender, apnSender, websocketSender, config.getPushConfiguration().getQueueSize(), pushLatencyManager);
PushSender pushSender = new PushSender(apnFallbackManager, clientPresenceManager, messagesManager, gcmSender, apnSender, config.getPushConfiguration().getQueueSize(), pushLatencyManager);
ReceiptSender receiptSender = new ReceiptSender(accountsManager, pushSender);
TurnTokenGenerator turnTokenGenerator = new TurnTokenGenerator(config.getTurnConfiguration());
RecaptchaClient recaptchaClient = new RecaptchaClient(config.getRecaptchaConfiguration().getSecret());

View File

@@ -18,17 +18,21 @@ package org.whispersystems.textsecuregcm.push;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.SharedMetricRegistries;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import org.whispersystems.textsecuregcm.metrics.PushLatencyManager;
import org.whispersystems.textsecuregcm.redis.RedisOperation;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.util.BlockingThreadPoolExecutor;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Util;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import static com.codahale.metrics.MetricRegistry.name;
@@ -37,33 +41,60 @@ import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
public class PushSender implements Managed {
@SuppressWarnings("unused")
private final Logger logger = LoggerFactory.getLogger(PushSender.class);
private final ApnFallbackManager apnFallbackManager;
private final ClientPresenceManager clientPresenceManager;
private final MessagesManager messagesManager;
private final GCMSender gcmSender;
private final APNSender apnSender;
private final WebsocketSender webSocketSender;
private final BlockingThreadPoolExecutor executor;
private final ExecutorService executor;
private final int queueSize;
private final PushLatencyManager pushLatencyManager;
public PushSender(ApnFallbackManager apnFallbackManager,
GCMSender gcmSender, APNSender apnSender,
WebsocketSender websocketSender, int queueSize,
PushLatencyManager pushLatencyManager)
private static final String SEND_COUNTER_NAME = name(PushSender.class, "sendMessage");
private static final String CHANNEL_TAG_NAME = "channel";
private static final String EPHEMERAL_TAG_NAME = "ephemeral";
private static final String CLIENT_ONLINE_TAG_NAME = "clientOnline";
public PushSender(ApnFallbackManager apnFallbackManager,
ClientPresenceManager clientPresenceManager,
MessagesManager messagesManager,
GCMSender gcmSender,
APNSender apnSender,
int queueSize,
PushLatencyManager pushLatencyManager)
{
this.apnFallbackManager = apnFallbackManager;
this.gcmSender = gcmSender;
this.apnSender = apnSender;
this.webSocketSender = websocketSender;
this.queueSize = queueSize;
this.executor = new BlockingThreadPoolExecutor("pushSender", 50, queueSize);
this.pushLatencyManager = pushLatencyManager;
this(apnFallbackManager,
clientPresenceManager,
messagesManager,
gcmSender,
apnSender,
queueSize,
new BlockingThreadPoolExecutor("pushSender", 50, queueSize),
pushLatencyManager);
SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME)
.register(name(PushSender.class, "send_queue_depth"),
(Gauge<Integer>) executor::getSize);
(Gauge<Integer>) ((BlockingThreadPoolExecutor)executor)::getSize);
}
@VisibleForTesting
PushSender(ApnFallbackManager apnFallbackManager,
ClientPresenceManager clientPresenceManager,
MessagesManager messagesManager,
GCMSender gcmSender,
APNSender apnSender,
int queueSize,
ExecutorService executor,
PushLatencyManager pushLatencyManager) {
this.apnFallbackManager = apnFallbackManager;
this.clientPresenceManager = clientPresenceManager;
this.messagesManager = messagesManager;
this.gcmSender = gcmSender;
this.apnSender = apnSender;
this.queueSize = queueSize;
this.executor = executor;
this.pushLatencyManager = pushLatencyManager;
}
public void sendMessage(final Account account, final Device device, final Envelope message, boolean online)
@@ -80,19 +111,44 @@ public class PushSender implements Managed {
}
}
private void sendSynchronousMessage(Account account, Device device, Envelope message, boolean online) {
if (device.getGcmId() != null) sendGcmMessage(account, device, message, online);
else if (device.getApnId() != null) sendApnMessage(account, device, message, online);
else if (device.getFetchesMessages()) sendWebSocketMessage(account, device, message, online);
else throw new AssertionError();
}
@VisibleForTesting
void sendSynchronousMessage(Account account, Device device, Envelope message, boolean online) {
final String channel;
private void sendGcmMessage(Account account, Device device, Envelope message, boolean online) {
final boolean delivered = webSocketSender.sendMessage(account, device, message, WebsocketSender.Type.GCM, online);
if (!delivered && !online) {
sendGcmNotification(account, device);
if (device.getGcmId() != null) {
channel = "gcm";
} else if (device.getApnId() != null) {
channel = "apn";
} else if (device.getFetchesMessages()) {
channel = "websocket";
} else {
throw new AssertionError();
}
final boolean clientPresent = clientPresenceManager.isPresent(account.getUuid(), device.getId());
if (online) {
if (clientPresent) {
messagesManager.insertEphemeral(account.getUuid(), device.getId(), message);
}
} else {
messagesManager.insert(account.getUuid(), device.getId(), message);
if (!clientPresent) {
if (!Util.isEmpty(device.getGcmId())) {
sendGcmNotification(account, device);
} else if (!Util.isEmpty(device.getApnId()) || !Util.isEmpty(device.getVoipApnId())) {
sendApnNotification(account, device);
}
}
}
final List<Tag> tags = List.of(
Tag.of(CHANNEL_TAG_NAME, channel),
Tag.of(EPHEMERAL_TAG_NAME, String.valueOf(online)),
Tag.of(CLIENT_ONLINE_TAG_NAME, String.valueOf(clientPresent)));
Metrics.counter(SEND_COUNTER_NAME, tags).increment();
}
private void sendGcmNotification(Account account, Device device) {
@@ -104,21 +160,9 @@ public class PushSender implements Managed {
RedisOperation.unchecked(() -> pushLatencyManager.recordPushSent(account.getUuid(), device.getId()));
}
private void sendApnMessage(Account account, Device device, Envelope outgoingMessage, boolean online) {
final boolean delivered = webSocketSender.sendMessage(account, device, outgoingMessage, WebsocketSender.Type.APN, online);
if (!delivered && outgoingMessage.getType() != Envelope.Type.RECEIPT && !online) {
sendApnNotification(account, device, false);
}
}
private void sendApnNotification(Account account, Device device, boolean newOnly) {
private void sendApnNotification(Account account, Device device) {
ApnMessage apnMessage;
if (newOnly && RedisOperation.unchecked(() -> apnFallbackManager.isScheduled(account, device))) {
return;
}
if (!Util.isEmpty(device.getVoipApnId())) {
apnMessage = new ApnMessage(device.getVoipApnId(), account.getNumber(), device.getId(), true, Optional.empty());
RedisOperation.unchecked(() -> apnFallbackManager.schedule(account, device));
@@ -131,13 +175,8 @@ public class PushSender implements Managed {
RedisOperation.unchecked(() -> pushLatencyManager.recordPushSent(account.getUuid(), device.getId()));
}
private void sendWebSocketMessage(Account account, Device device, Envelope outgoingMessage, boolean online)
{
webSocketSender.sendMessage(account, device, outgoingMessage, WebsocketSender.Type.WEB, online);
}
@Override
public void start() throws Exception {
public void start() {
apnSender.start();
gcmSender.start();
}
@@ -150,5 +189,4 @@ public class PushSender implements Managed {
apnSender.stop();
gcmSender.stop();
}
}

View File

@@ -1,97 +0,0 @@
/*
* Copyright (C) 2014 Open WhisperSystems
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.whispersystems.textsecuregcm.push;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.util.Constants;
import static com.codahale.metrics.MetricRegistry.name;
import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
public class WebsocketSender {
public enum Type {
APN,
GCM,
WEB
}
@SuppressWarnings("unused")
private static final Logger logger = LoggerFactory.getLogger(WebsocketSender.class);
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private final Meter websocketOnlineMeter = metricRegistry.meter(name(getClass(), "ws_online" ));
private final Meter websocketOfflineMeter = metricRegistry.meter(name(getClass(), "ws_offline" ));
private final Meter apnOnlineMeter = metricRegistry.meter(name(getClass(), "apn_online" ));
private final Meter apnOfflineMeter = metricRegistry.meter(name(getClass(), "apn_offline"));
private final Meter gcmOnlineMeter = metricRegistry.meter(name(getClass(), "gcm_online" ));
private final Meter gcmOfflineMeter = metricRegistry.meter(name(getClass(), "gcm_offline"));
private final Counter ephemeralOnlineCounter = Metrics.counter(name(getClass(), "ephemeral"), "online", "true");
private final Counter ephemeralOfflineCounter = Metrics.counter(name(getClass(), "ephemeral"), "offline", "true");
private final MessagesManager messagesManager;
private final ClientPresenceManager clientPresenceManager;
public WebsocketSender(MessagesManager messagesManager, ClientPresenceManager clientPresenceManager) {
this.messagesManager = messagesManager;
this.clientPresenceManager = clientPresenceManager;
}
public boolean sendMessage(Account account, Device device, Envelope message, Type channel, boolean online) {
final boolean clientPresent = clientPresenceManager.isPresent(account.getUuid(), device.getId());
if (online) {
if (clientPresent) {
ephemeralOnlineCounter.increment();
messagesManager.insertEphemeral(account.getUuid(), device.getId(), message);
return true;
} else {
ephemeralOfflineCounter.increment();
return false;
}
} else {
messagesManager.insert(account.getUuid(), device.getId(), message);
if (clientPresent) {
if (channel == Type.APN) apnOnlineMeter.mark();
else if (channel == Type.GCM) gcmOnlineMeter.mark();
else websocketOnlineMeter.mark();
return true;
} else {
if (channel == Type.APN) apnOfflineMeter.mark();
else if (channel == Type.GCM) gcmOfflineMeter.mark();
else websocketOfflineMeter.mark();
return false;
}
}
}
}