Drop pub/sub operations from WebsocketConnection.

This commit is contained in:
Jon Chambers
2020-09-09 19:21:55 -04:00
committed by Jon Chambers
parent 4f2e06407b
commit 7e14a0bc30
8 changed files with 66 additions and 223 deletions

View File

@@ -4,17 +4,13 @@ import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.push.ApnFallbackManager;
import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
import org.whispersystems.textsecuregcm.push.PushSender;
import org.whispersystems.textsecuregcm.push.ReceiptSender;
import org.whispersystems.textsecuregcm.redis.RedisOperation;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.PubSubManager;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.websocket.session.WebSocketSessionContext;
import org.whispersystems.websocket.setup.WebSocketConnectListener;
@@ -25,30 +21,23 @@ import static com.codahale.metrics.MetricRegistry.name;
public class AuthenticatedConnectListener implements WebSocketConnectListener {
private static final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class);
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private static final Timer durationTimer = metricRegistry.timer(name(WebSocketConnection.class, "connected_duration" ));
private static final Timer unauthenticatedDurationTimer = metricRegistry.timer(name(WebSocketConnection.class, "unauthenticated_connection_duration"));
private static final Counter openWebsocketCounter = metricRegistry.counter(name(WebSocketConnection.class, "open_websockets"));
private final PushSender pushSender;
private final ReceiptSender receiptSender;
private final MessagesManager messagesManager;
private final PubSubManager pubSubManager;
private final ApnFallbackManager apnFallbackManager;
private final ClientPresenceManager clientPresenceManager;
public AuthenticatedConnectListener(PushSender pushSender,
ReceiptSender receiptSender,
public AuthenticatedConnectListener(ReceiptSender receiptSender,
MessagesManager messagesManager,
PubSubManager pubSubManager,
ApnFallbackManager apnFallbackManager,
ClientPresenceManager clientPresenceManager)
{
this.pushSender = pushSender;
this.receiptSender = receiptSender;
this.messagesManager = messagesManager;
this.pubSubManager = pubSubManager;
this.apnFallbackManager = apnFallbackManager;
this.clientPresenceManager = clientPresenceManager;
}
@@ -60,8 +49,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
final Device device = account.getAuthenticatedDevice().get();
final String connectionId = String.valueOf(new SecureRandom().nextLong());
final Timer.Context timer = durationTimer.time();
final WebsocketAddress address = new WebsocketAddress(account.getNumber(), device.getId());
final WebSocketConnection connection = new WebSocketConnection(pushSender, receiptSender,
final WebSocketConnection connection = new WebSocketConnection(receiptSender,
messagesManager, account, device,
context.getClient(), connectionId);
@@ -70,15 +58,16 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
clientPresenceManager.setPresent(account.getUuid(), device.getId(), connection);
messagesManager.addMessageAvailabilityListener(account.getUuid(), device.getId(), connection);
pubSubManager.subscribe(address, connection);
connection.start();
context.addListener(new WebSocketSessionContext.WebSocketEventListener() {
@Override
public void onWebSocketClose(WebSocketSessionContext context, int statusCode, String reason) {
openWebsocketCounter.dec();
pubSubManager.unsubscribe(address, connection);
clientPresenceManager.clearPresence(account.getUuid(), device.getId());
messagesManager.removeMessageAvailabilityListener(connection);
connection.stop();
openWebsocketCounter.dec();
timer.stop();
}
});

View File

@@ -44,7 +44,7 @@ import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
import static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public class WebSocketConnection implements DispatchChannel, MessageAvailabilityListener, DisplacedPresenceListener {
public class WebSocketConnection implements MessageAvailabilityListener, DisplacedPresenceListener {
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
public static final Histogram messageTime = metricRegistry.histogram(name(MessageController.class, "message_delivery_duration"));
@@ -52,14 +52,11 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
private static final Meter messageAvailableMeter = metricRegistry.meter(name(WebSocketConnection.class, "messagesAvailable"));
private static final Meter ephemeralMessageAvailableMeter = metricRegistry.meter(name(WebSocketConnection.class, "ephemeralMessagesAvailable"));
private static final Meter messagesPersistedMeter = metricRegistry.meter(name(WebSocketConnection.class, "messagesPersisted"));
private static final Meter pubSubNewMessageMeter = metricRegistry.meter(name(WebSocketConnection.class, "pubSubNewMessage"));
private static final Meter pubSubPersistedMeter = metricRegistry.meter(name(WebSocketConnection.class, "pubSubPersisted"));
private static final Meter displacementMeter = metricRegistry.meter(name(WebSocketConnection.class, "explicitDisplacement"));
private static final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class);
private final ReceiptSender receiptSender;
private final PushSender pushSender;
private final MessagesManager messagesManager;
private final Account account;
@@ -77,15 +74,13 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
PERSISTED_NEW_MESSAGES_AVAILABLE
}
public WebSocketConnection(PushSender pushSender,
ReceiptSender receiptSender,
public WebSocketConnection(ReceiptSender receiptSender,
MessagesManager messagesManager,
Account account,
Device device,
WebSocketClient client,
String connectionId)
{
this.pushSender = pushSender;
this.receiptSender = receiptSender;
this.messagesManager = messagesManager;
this.account = account;
@@ -94,38 +89,14 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
this.connectionId = connectionId;
}
@Override
public void onDispatchMessage(String channel, byte[] message) {
try {
PubSubMessage pubSubMessage = PubSubMessage.parseFrom(message);
switch (pubSubMessage.getType().getNumber()) {
case PubSubMessage.Type.QUERY_DB_VALUE:
pubSubPersistedMeter.mark();
storedMessageState.set(StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE);
processStoredMessages();
break;
case PubSubMessage.Type.DELIVER_VALUE:
pubSubNewMessageMeter.mark();
sendMessage(Envelope.parseFrom(pubSubMessage.getContent()), Optional.empty());
break;
default:
logger.warn("Unknown pubsub message: " + pubSubMessage.getType().getNumber());
}
} catch (InvalidProtocolBufferException e) {
logger.warn("Protobuf parse error", e);
}
}
@Override
public void onDispatchUnsubscribed(String channel) {
client.close(1000, "OK");
}
public void onDispatchSubscribed(String channel) {
public void start() {
processStoredMessages();
}
public void stop() {
client.close(1000, "OK");
}
private CompletableFuture<WebSocketResponseMessage> sendMessage(final Envelope message, final Optional<StoredMessageInfo> storedMessageInfo) {
try {
String header;
@@ -143,20 +114,16 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
return client.sendRequest("PUT", "/api/v1/message", List.of(header, TimestampHeaderUtil.getTimestampHeader()), body).whenComplete((response, throwable) -> {
if (throwable == null) {
boolean isReceipt = message.getType() == Envelope.Type.RECEIPT;
if (isSuccessResponse(response) && !isReceipt) {
messageTime.update(System.currentTimeMillis() - message.getTimestamp());
}
if (isSuccessResponse(response)) {
if (storedMessageInfo.isPresent()) messagesManager.delete(account.getNumber(), account.getUuid(), device.getId(), storedMessageInfo.get().id, storedMessageInfo.get().cached);
if (!isReceipt) sendDeliveryReceiptFor(message);
} else if (!isSuccessResponse(response) && !storedMessageInfo.isPresent()) {
requeueMessage(message);
if (storedMessageInfo.isPresent()) {
messagesManager.delete(account.getNumber(), account.getUuid(), device.getId(), storedMessageInfo.get().id, storedMessageInfo.get().cached);
}
if (message.getType() != Envelope.Type.RECEIPT) {
messageTime.update(System.currentTimeMillis() - message.getTimestamp());
sendDeliveryReceiptFor(message);
}
}
} else {
if (!storedMessageInfo.isPresent()) requeueMessage(message);
}
});
} catch (CryptoEncodingException e) {
@@ -165,16 +132,6 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
}
}
private void requeueMessage(Envelope message) {
pushSender.getWebSocketSender().queueMessage(account, device, message);
try {
pushSender.sendQueuedNotification(account, device);
} catch (NotPushRegisteredException e) {
logger.warn("requeueMessage", e);
}
}
private void sendDeliveryReceiptFor(Envelope message) {
if (!message.hasSource()) return;