Revert keyspace delivery for all messages

* Revert "Send all messages via keyspace notifications when a feature flag is enabled."

This reverts commit fadcf62166.

* Revert "Consolidate semaphore release logic."

This reverts commit c02b255766.

* Revert "Represent stored message state as an enumeration rather than a collection of booleans."

This reverts commit 89788fa665.

* Revert "Refactor: collapse state into semaphores/atomic booleans."

This reverts commit a052e2ee8f.

* Revert "Refactor: move sendNextMessagePage into its own method."

This reverts commit 158e5004b7.

* Revert "Avoid querying the database if we think all new messages are in the cache."

This reverts commit 6f9ff3be37.

* Revert "Query for more stored messages if an update happens while we're already processing a batch."

This reverts commit f766c57743.

* Revert "Only send the "queue cleared" message once per websocket session."

This reverts commit 8f53152c3e.

* Revert "Let processStoredMessages handle requery logic."

This reverts commit 7bbc88d716.

* Revert "Only allow one thread to process stored messages at a time."

This reverts commit 68256d2343.
This commit is contained in:
Jon Chambers
2020-09-14 15:35:10 -04:00
committed by GitHub
parent c660daf4c2
commit 4dc49604b6
7 changed files with 47 additions and 426 deletions

View File

@@ -300,7 +300,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
PubSubManager pubSubManager = new PubSubManager(pubsubClient, dispatchManager);
APNSender apnSender = new APNSender(accountsManager, config.getApnConfiguration());
GCMSender gcmSender = new GCMSender(accountsManager, config.getGcmConfiguration().getApiKey());
WebsocketSender websocketSender = new WebsocketSender(messagesManager, pubSubManager, clientPresenceManager, featureFlagsManager);
WebsocketSender websocketSender = new WebsocketSender(messagesManager, pubSubManager, clientPresenceManager);
RateLimiters rateLimiters = new RateLimiters(config.getLimitsConfiguration(), cacheCluster);
AccountAuthenticator accountAuthenticator = new AccountAuthenticator(accountsManager);

View File

@@ -193,7 +193,7 @@ public class MessageController {
final OutgoingMessageEntityList outgoingMessages = messagesManager.getMessagesForDevice(account.getNumber(),
account.getUuid(),
account.getAuthenticatedDevice().get().getId(),
userAgent, false);
userAgent);
outgoingMessageListSizeHistogram.update(outgoingMessages.getMessages().size());

View File

@@ -69,43 +69,21 @@ public class WebsocketSender {
private final MessagesManager messagesManager;
private final PubSubManager pubSubManager;
private final ClientPresenceManager clientPresenceManager;
private final FeatureFlagsManager featureFlagsManager;
private static final String KEYSPACE_DELIVERY_FEATURE_FLAG = "keyspace-delivery-for-all-messages";
public WebsocketSender(MessagesManager messagesManager, PubSubManager pubSubManager, ClientPresenceManager clientPresenceManager, final FeatureFlagsManager featureFlagsManager) {
public WebsocketSender(MessagesManager messagesManager, PubSubManager pubSubManager, ClientPresenceManager clientPresenceManager) {
this.messagesManager = messagesManager;
this.pubSubManager = pubSubManager;
this.clientPresenceManager = clientPresenceManager;
this.featureFlagsManager = featureFlagsManager;
}
public boolean sendMessage(Account account, Device device, Envelope message, Type channel, boolean online) {
final boolean clientPresent = clientPresenceManager.isPresent(account.getUuid(), device.getId());
if (online) {
if (clientPresent) {
if (clientPresenceManager.isPresent(account.getUuid(), device.getId())) {
ephemeralOnlineCounter.increment();
messagesManager.insertEphemeral(account.getUuid(), device.getId(), message);
return true;
} else {
ephemeralOfflineCounter.increment();
return false;
}
} else if (featureFlagsManager.isFeatureFlagActive(KEYSPACE_DELIVERY_FEATURE_FLAG)) {
messagesManager.insert(account.getUuid(), device.getId(), message);
if (clientPresent) {
if (channel == Type.APN) apnOnlineMeter.mark();
else if (channel == Type.GCM) gcmOnlineMeter.mark();
else websocketOnlineMeter.mark();
return true;
} else {
if (channel == Type.APN) apnOfflineMeter.mark();
else if (channel == Type.GCM) gcmOfflineMeter.mark();
else websocketOfflineMeter.mark();
return false;
}
} else {
@@ -117,7 +95,7 @@ public class WebsocketSender {
pubSubManager.publish(address, pubSubMessage);
if (clientPresent) {
if (clientPresenceManager.isPresent(account.getUuid(), device.getId())) {
if (channel == Type.APN) apnOnlineMeter.mark();
else if (channel == Type.GCM) gcmOnlineMeter.mark();
else websocketOnlineMeter.mark();

View File

@@ -11,7 +11,6 @@ import org.whispersystems.textsecuregcm.metrics.PushLatencyManager;
import org.whispersystems.textsecuregcm.redis.RedisOperation;
import org.whispersystems.textsecuregcm.util.Constants;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
@@ -50,10 +49,10 @@ public class MessagesManager {
return messagesCache.takeEphemeralMessage(destinationUuid, destinationDevice);
}
public OutgoingMessageEntityList getMessagesForDevice(String destination, UUID destinationUuid, long destinationDevice, final String userAgent, final boolean cachedMessagesOnly) {
public OutgoingMessageEntityList getMessagesForDevice(String destination, UUID destinationUuid, long destinationDevice, final String userAgent) {
RedisOperation.unchecked(() -> pushLatencyManager.recordQueueRead(destinationUuid, destinationDevice, userAgent));
List<OutgoingMessageEntity> messages = cachedMessagesOnly ? new ArrayList<>() : this.messages.load(destination, destinationDevice);
List<OutgoingMessageEntity> messages = this.messages.load(destination, destinationDevice);
if (messages.size() <= Messages.RESULT_SET_CHUNK_SIZE) {
messages.addAll(messagesCache.get(destinationUuid, destinationDevice, Messages.RESULT_SET_CHUNK_SIZE - messages.size()));

View File

@@ -4,7 +4,6 @@ import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import org.slf4j.Logger;
@@ -32,12 +31,9 @@ import org.whispersystems.websocket.messages.WebSocketResponseMessage;
import javax.ws.rs.WebApplicationException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static com.codahale.metrics.MetricRegistry.name;
import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
@@ -67,16 +63,6 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
private final WebSocketClient client;
private final String connectionId;
private final Semaphore processStoredMessagesSemaphore = new Semaphore(1);
private final AtomicReference<StoredMessageState> storedMessageState = new AtomicReference<>(StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE);
private final AtomicBoolean sentInitialQueueEmptyMessage = new AtomicBoolean(false);
private enum StoredMessageState {
EMPTY,
CACHED_NEW_MESSAGES_AVAILABLE,
PERSISTED_NEW_MESSAGES_AVAILABLE
}
public WebSocketConnection(PushSender pushSender,
ReceiptSender receiptSender,
MessagesManager messagesManager,
@@ -102,12 +88,11 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
switch (pubSubMessage.getType().getNumber()) {
case PubSubMessage.Type.QUERY_DB_VALUE:
pubSubPersistedMeter.mark();
storedMessageState.set(StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE);
processStoredMessages();
break;
case PubSubMessage.Type.DELIVER_VALUE:
pubSubNewMessageMeter.mark();
sendMessage(Envelope.parseFrom(pubSubMessage.getContent()), Optional.empty());
sendMessage(Envelope.parseFrom(pubSubMessage.getContent()), Optional.empty(), false);
break;
default:
logger.warn("Unknown pubsub message: " + pubSubMessage.getType().getNumber());
@@ -126,7 +111,10 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
processStoredMessages();
}
private CompletableFuture<Void> sendMessage(final Envelope message, final Optional<StoredMessageInfo> storedMessageInfo) {
private void sendMessage(final Envelope message,
final Optional<StoredMessageInfo> storedMessageInfo,
final boolean requery)
{
try {
String header;
Optional<byte[]> body;
@@ -141,7 +129,7 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
sendMessageMeter.mark();
return client.sendRequest("PUT", "/api/v1/message", List.of(header, TimestampHeaderUtil.getTimestampHeader()), body)
client.sendRequest("PUT", "/api/v1/message", List.of(header, TimestampHeaderUtil.getTimestampHeader()), body)
.thenAccept(response -> {
boolean isReceipt = message.getType() == Envelope.Type.RECEIPT;
@@ -152,6 +140,7 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
if (isSuccessResponse(response)) {
if (storedMessageInfo.isPresent()) messagesManager.delete(account.getNumber(), account.getUuid(), device.getId(), storedMessageInfo.get().id, storedMessageInfo.get().cached);
if (!isReceipt) sendDeliveryReceiptFor(message);
if (requery) processStoredMessages();
} else if (!isSuccessResponse(response) && !storedMessageInfo.isPresent()) {
requeueMessage(message);
}
@@ -162,7 +151,6 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
});
} catch (CryptoEncodingException e) {
logger.warn("Bad signaling key", e);
return CompletableFuture.failedFuture(e);
}
}
@@ -192,42 +180,20 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
return response != null && response.getStatus() >= 200 && response.getStatus() < 300;
}
@VisibleForTesting
void processStoredMessages() {
if (processStoredMessagesSemaphore.tryAcquire()) {
final StoredMessageState state = storedMessageState.getAndSet(StoredMessageState.EMPTY);
final CompletableFuture<Void> queueClearedFuture = new CompletableFuture<>();
private void processStoredMessages() {
OutgoingMessageEntityList messages = messagesManager.getMessagesForDevice(account.getNumber(), account.getUuid(), device.getId(), client.getUserAgent());
Iterator<OutgoingMessageEntity> iterator = messages.getMessages().iterator();
sendNextMessagePage(state != StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE, queueClearedFuture);
queueClearedFuture.whenComplete((v, cause) -> {
if (sentInitialQueueEmptyMessage.compareAndSet(false, true)) {
client.sendRequest("PUT", "/api/v1/queue/empty", Collections.singletonList(TimestampHeaderUtil.getTimestampHeader()), Optional.empty());
}
processStoredMessagesSemaphore.release();
if (storedMessageState.get() != StoredMessageState.EMPTY) {
processStoredMessages();
}
});
}
}
private void sendNextMessagePage(final boolean cachedMessagesOnly, final CompletableFuture<Void> queueClearedFuture) {
final OutgoingMessageEntityList messages = messagesManager.getMessagesForDevice(account.getNumber(), account.getUuid(), device.getId(), client.getUserAgent(), cachedMessagesOnly);
final CompletableFuture<?>[] sendFutures = new CompletableFuture[messages.getMessages().size()];
for (int i = 0; i < messages.getMessages().size(); i++) {
final OutgoingMessageEntity message = messages.getMessages().get(i);
final Envelope.Builder builder = Envelope.newBuilder()
.setType(Envelope.Type.valueOf(message.getType()))
.setTimestamp(message.getTimestamp())
.setServerTimestamp(message.getServerTimestamp());
while (iterator.hasNext()) {
OutgoingMessageEntity message = iterator.next();
Envelope.Builder builder = Envelope.newBuilder()
.setType(Envelope.Type.valueOf(message.getType()))
.setTimestamp(message.getTimestamp())
.setServerTimestamp(message.getServerTimestamp());
if (!Util.isEmpty(message.getSource())) {
builder.setSource(message.getSource())
.setSourceDevice(message.getSourceDevice());
.setSourceDevice(message.getSourceDevice());
}
if (message.getMessage() != null) {
@@ -242,40 +208,33 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
builder.setRelay(message.getRelay());
}
sendFutures[i] = sendMessage(builder.build(), Optional.of(new StoredMessageInfo(message.getId(), message.isCached())));
sendMessage(builder.build(), Optional.of(new StoredMessageInfo(message.getId(), message.isCached())), !iterator.hasNext() && messages.hasMore());
}
CompletableFuture.allOf(sendFutures).whenComplete((v, cause) -> {
if (messages.hasMore()) {
sendNextMessagePage(cachedMessagesOnly, queueClearedFuture);
} else {
queueClearedFuture.complete(null);
}
});
if (!messages.hasMore()) {
client.sendRequest("PUT", "/api/v1/queue/empty", Collections.singletonList(TimestampHeaderUtil.getTimestampHeader()), Optional.empty());
}
}
@Override
public void handleNewMessagesAvailable() {
messageAvailableMeter.mark();
storedMessageState.compareAndSet(StoredMessageState.EMPTY, StoredMessageState.CACHED_NEW_MESSAGES_AVAILABLE);
processStoredMessages();
}
@Override
public void handleNewEphemeralMessageAvailable() {
ephemeralMessageAvailableMeter.mark();
messagesManager.takeEphemeralMessage(account.getUuid(), device.getId())
.ifPresent(message -> sendMessage(message, Optional.empty()));
final Optional<Envelope> maybeMessage = messagesManager.takeEphemeralMessage(account.getUuid(), device.getId());
if (maybeMessage.isPresent()) {
sendMessage(maybeMessage.get(), Optional.empty(), false);
}
}
@Override
public void handleMessagesPersisted() {
messagesPersistedMeter.mark();
storedMessageState.set(StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE);
processStoredMessages();
}
@Override