mirror of
https://github.com/signalapp/Signal-Server
synced 2026-04-20 23:48:06 +01:00
Revert "Revert keyspace delivery for all messages"
This reverts commit 4dc49604b6.
This commit is contained in:
committed by
Jon Chambers
parent
8016e84bc7
commit
62c31eb202
@@ -300,7 +300,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||
PubSubManager pubSubManager = new PubSubManager(pubsubClient, dispatchManager);
|
||||
APNSender apnSender = new APNSender(accountsManager, config.getApnConfiguration());
|
||||
GCMSender gcmSender = new GCMSender(accountsManager, config.getGcmConfiguration().getApiKey());
|
||||
WebsocketSender websocketSender = new WebsocketSender(messagesManager, pubSubManager, clientPresenceManager);
|
||||
WebsocketSender websocketSender = new WebsocketSender(messagesManager, pubSubManager, clientPresenceManager, featureFlagsManager);
|
||||
RateLimiters rateLimiters = new RateLimiters(config.getLimitsConfiguration(), cacheCluster);
|
||||
|
||||
AccountAuthenticator accountAuthenticator = new AccountAuthenticator(accountsManager);
|
||||
|
||||
@@ -211,7 +211,8 @@ public class MessageController {
|
||||
final OutgoingMessageEntityList outgoingMessages = messagesManager.getMessagesForDevice(account.getNumber(),
|
||||
account.getUuid(),
|
||||
account.getAuthenticatedDevice().get().getId(),
|
||||
userAgent);
|
||||
userAgent,
|
||||
false);
|
||||
|
||||
outgoingMessageListSizeHistogram.update(outgoingMessages.getMessages().size());
|
||||
|
||||
|
||||
@@ -69,21 +69,43 @@ public class WebsocketSender {
|
||||
private final MessagesManager messagesManager;
|
||||
private final PubSubManager pubSubManager;
|
||||
private final ClientPresenceManager clientPresenceManager;
|
||||
private final FeatureFlagsManager featureFlagsManager;
|
||||
|
||||
public WebsocketSender(MessagesManager messagesManager, PubSubManager pubSubManager, ClientPresenceManager clientPresenceManager) {
|
||||
private static final String KEYSPACE_DELIVERY_FEATURE_FLAG = "keyspace-delivery-for-all-messages";
|
||||
|
||||
public WebsocketSender(MessagesManager messagesManager, PubSubManager pubSubManager, ClientPresenceManager clientPresenceManager, final FeatureFlagsManager featureFlagsManager) {
|
||||
this.messagesManager = messagesManager;
|
||||
this.pubSubManager = pubSubManager;
|
||||
this.clientPresenceManager = clientPresenceManager;
|
||||
this.featureFlagsManager = featureFlagsManager;
|
||||
}
|
||||
|
||||
public boolean sendMessage(Account account, Device device, Envelope message, Type channel, boolean online) {
|
||||
final boolean clientPresent = clientPresenceManager.isPresent(account.getUuid(), device.getId());
|
||||
|
||||
if (online) {
|
||||
if (clientPresenceManager.isPresent(account.getUuid(), device.getId())) {
|
||||
if (clientPresent) {
|
||||
ephemeralOnlineCounter.increment();
|
||||
messagesManager.insertEphemeral(account.getUuid(), device.getId(), message);
|
||||
return true;
|
||||
} else {
|
||||
ephemeralOfflineCounter.increment();
|
||||
return false;
|
||||
}
|
||||
} else if (featureFlagsManager.isFeatureFlagActive(KEYSPACE_DELIVERY_FEATURE_FLAG)) {
|
||||
messagesManager.insert(account.getUuid(), device.getId(), message);
|
||||
|
||||
if (clientPresent) {
|
||||
if (channel == Type.APN) apnOnlineMeter.mark();
|
||||
else if (channel == Type.GCM) gcmOnlineMeter.mark();
|
||||
else websocketOnlineMeter.mark();
|
||||
|
||||
return true;
|
||||
} else {
|
||||
if (channel == Type.APN) apnOfflineMeter.mark();
|
||||
else if (channel == Type.GCM) gcmOfflineMeter.mark();
|
||||
else websocketOfflineMeter.mark();
|
||||
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
@@ -95,7 +117,7 @@ public class WebsocketSender {
|
||||
|
||||
pubSubManager.publish(address, pubSubMessage);
|
||||
|
||||
if (clientPresenceManager.isPresent(account.getUuid(), device.getId())) {
|
||||
if (clientPresent) {
|
||||
if (channel == Type.APN) apnOnlineMeter.mark();
|
||||
else if (channel == Type.GCM) gcmOnlineMeter.mark();
|
||||
else websocketOnlineMeter.mark();
|
||||
|
||||
@@ -11,6 +11,7 @@ import org.whispersystems.textsecuregcm.metrics.PushLatencyManager;
|
||||
import org.whispersystems.textsecuregcm.redis.RedisOperation;
|
||||
import org.whispersystems.textsecuregcm.util.Constants;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
@@ -49,10 +50,10 @@ public class MessagesManager {
|
||||
return messagesCache.takeEphemeralMessage(destinationUuid, destinationDevice);
|
||||
}
|
||||
|
||||
public OutgoingMessageEntityList getMessagesForDevice(String destination, UUID destinationUuid, long destinationDevice, final String userAgent) {
|
||||
public OutgoingMessageEntityList getMessagesForDevice(String destination, UUID destinationUuid, long destinationDevice, final String userAgent, final boolean cachedMessagesOnly) {
|
||||
RedisOperation.unchecked(() -> pushLatencyManager.recordQueueRead(destinationUuid, destinationDevice, userAgent));
|
||||
|
||||
List<OutgoingMessageEntity> messages = this.messages.load(destination, destinationDevice);
|
||||
List<OutgoingMessageEntity> messages = cachedMessagesOnly ? new ArrayList<>() : this.messages.load(destination, destinationDevice);
|
||||
|
||||
if (messages.size() <= Messages.RESULT_SET_CHUNK_SIZE) {
|
||||
messages.addAll(messagesCache.get(destinationUuid, destinationDevice, Messages.RESULT_SET_CHUNK_SIZE - messages.size()));
|
||||
|
||||
@@ -4,6 +4,7 @@ import com.codahale.metrics.Histogram;
|
||||
import com.codahale.metrics.Meter;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
import org.slf4j.Logger;
|
||||
@@ -31,9 +32,12 @@ import org.whispersystems.websocket.messages.WebSocketResponseMessage;
|
||||
|
||||
import javax.ws.rs.WebApplicationException;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
|
||||
@@ -63,6 +67,16 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
|
||||
private final WebSocketClient client;
|
||||
private final String connectionId;
|
||||
|
||||
private final Semaphore processStoredMessagesSemaphore = new Semaphore(1);
|
||||
private final AtomicReference<StoredMessageState> storedMessageState = new AtomicReference<>(StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE);
|
||||
private final AtomicBoolean sentInitialQueueEmptyMessage = new AtomicBoolean(false);
|
||||
|
||||
private enum StoredMessageState {
|
||||
EMPTY,
|
||||
CACHED_NEW_MESSAGES_AVAILABLE,
|
||||
PERSISTED_NEW_MESSAGES_AVAILABLE
|
||||
}
|
||||
|
||||
public WebSocketConnection(PushSender pushSender,
|
||||
ReceiptSender receiptSender,
|
||||
MessagesManager messagesManager,
|
||||
@@ -88,11 +102,12 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
|
||||
switch (pubSubMessage.getType().getNumber()) {
|
||||
case PubSubMessage.Type.QUERY_DB_VALUE:
|
||||
pubSubPersistedMeter.mark();
|
||||
storedMessageState.set(StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE);
|
||||
processStoredMessages();
|
||||
break;
|
||||
case PubSubMessage.Type.DELIVER_VALUE:
|
||||
pubSubNewMessageMeter.mark();
|
||||
sendMessage(Envelope.parseFrom(pubSubMessage.getContent()), Optional.empty(), false);
|
||||
sendMessage(Envelope.parseFrom(pubSubMessage.getContent()), Optional.empty());
|
||||
break;
|
||||
default:
|
||||
logger.warn("Unknown pubsub message: " + pubSubMessage.getType().getNumber());
|
||||
@@ -111,10 +126,7 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
|
||||
processStoredMessages();
|
||||
}
|
||||
|
||||
private void sendMessage(final Envelope message,
|
||||
final Optional<StoredMessageInfo> storedMessageInfo,
|
||||
final boolean requery)
|
||||
{
|
||||
private CompletableFuture<Void> sendMessage(final Envelope message, final Optional<StoredMessageInfo> storedMessageInfo) {
|
||||
try {
|
||||
String header;
|
||||
Optional<byte[]> body;
|
||||
@@ -129,7 +141,7 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
|
||||
|
||||
sendMessageMeter.mark();
|
||||
|
||||
client.sendRequest("PUT", "/api/v1/message", List.of(header, TimestampHeaderUtil.getTimestampHeader()), body)
|
||||
return client.sendRequest("PUT", "/api/v1/message", List.of(header, TimestampHeaderUtil.getTimestampHeader()), body)
|
||||
.thenAccept(response -> {
|
||||
boolean isReceipt = message.getType() == Envelope.Type.RECEIPT;
|
||||
|
||||
@@ -140,7 +152,6 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
|
||||
if (isSuccessResponse(response)) {
|
||||
if (storedMessageInfo.isPresent()) messagesManager.delete(account.getNumber(), account.getUuid(), device.getId(), storedMessageInfo.get().id, storedMessageInfo.get().cached);
|
||||
if (!isReceipt) sendDeliveryReceiptFor(message);
|
||||
if (requery) processStoredMessages();
|
||||
} else if (!isSuccessResponse(response) && !storedMessageInfo.isPresent()) {
|
||||
requeueMessage(message);
|
||||
}
|
||||
@@ -151,6 +162,7 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
|
||||
});
|
||||
} catch (CryptoEncodingException e) {
|
||||
logger.warn("Bad signaling key", e);
|
||||
return CompletableFuture.failedFuture(e);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -180,20 +192,42 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
|
||||
return response != null && response.getStatus() >= 200 && response.getStatus() < 300;
|
||||
}
|
||||
|
||||
private void processStoredMessages() {
|
||||
OutgoingMessageEntityList messages = messagesManager.getMessagesForDevice(account.getNumber(), account.getUuid(), device.getId(), client.getUserAgent());
|
||||
Iterator<OutgoingMessageEntity> iterator = messages.getMessages().iterator();
|
||||
@VisibleForTesting
|
||||
void processStoredMessages() {
|
||||
if (processStoredMessagesSemaphore.tryAcquire()) {
|
||||
final StoredMessageState state = storedMessageState.getAndSet(StoredMessageState.EMPTY);
|
||||
final CompletableFuture<Void> queueClearedFuture = new CompletableFuture<>();
|
||||
|
||||
while (iterator.hasNext()) {
|
||||
OutgoingMessageEntity message = iterator.next();
|
||||
Envelope.Builder builder = Envelope.newBuilder()
|
||||
.setType(Envelope.Type.valueOf(message.getType()))
|
||||
.setTimestamp(message.getTimestamp())
|
||||
.setServerTimestamp(message.getServerTimestamp());
|
||||
sendNextMessagePage(state != StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE, queueClearedFuture);
|
||||
|
||||
queueClearedFuture.whenComplete((v, cause) -> {
|
||||
if (sentInitialQueueEmptyMessage.compareAndSet(false, true)) {
|
||||
client.sendRequest("PUT", "/api/v1/queue/empty", Collections.singletonList(TimestampHeaderUtil.getTimestampHeader()), Optional.empty());
|
||||
}
|
||||
|
||||
processStoredMessagesSemaphore.release();
|
||||
|
||||
if (storedMessageState.get() != StoredMessageState.EMPTY) {
|
||||
processStoredMessages();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void sendNextMessagePage(final boolean cachedMessagesOnly, final CompletableFuture<Void> queueClearedFuture) {
|
||||
final OutgoingMessageEntityList messages = messagesManager.getMessagesForDevice(account.getNumber(), account.getUuid(), device.getId(), client.getUserAgent(), cachedMessagesOnly);
|
||||
final CompletableFuture<?>[] sendFutures = new CompletableFuture[messages.getMessages().size()];
|
||||
|
||||
for (int i = 0; i < messages.getMessages().size(); i++) {
|
||||
final OutgoingMessageEntity message = messages.getMessages().get(i);
|
||||
final Envelope.Builder builder = Envelope.newBuilder()
|
||||
.setType(Envelope.Type.valueOf(message.getType()))
|
||||
.setTimestamp(message.getTimestamp())
|
||||
.setServerTimestamp(message.getServerTimestamp());
|
||||
|
||||
if (!Util.isEmpty(message.getSource())) {
|
||||
builder.setSource(message.getSource())
|
||||
.setSourceDevice(message.getSourceDevice());
|
||||
.setSourceDevice(message.getSourceDevice());
|
||||
}
|
||||
|
||||
if (message.getMessage() != null) {
|
||||
@@ -208,33 +242,40 @@ public class WebSocketConnection implements DispatchChannel, MessageAvailability
|
||||
builder.setRelay(message.getRelay());
|
||||
}
|
||||
|
||||
sendMessage(builder.build(), Optional.of(new StoredMessageInfo(message.getId(), message.isCached())), !iterator.hasNext() && messages.hasMore());
|
||||
sendFutures[i] = sendMessage(builder.build(), Optional.of(new StoredMessageInfo(message.getId(), message.isCached())));
|
||||
}
|
||||
|
||||
if (!messages.hasMore()) {
|
||||
client.sendRequest("PUT", "/api/v1/queue/empty", Collections.singletonList(TimestampHeaderUtil.getTimestampHeader()), Optional.empty());
|
||||
}
|
||||
CompletableFuture.allOf(sendFutures).whenComplete((v, cause) -> {
|
||||
if (messages.hasMore()) {
|
||||
sendNextMessagePage(cachedMessagesOnly, queueClearedFuture);
|
||||
} else {
|
||||
queueClearedFuture.complete(null);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleNewMessagesAvailable() {
|
||||
messageAvailableMeter.mark();
|
||||
|
||||
storedMessageState.compareAndSet(StoredMessageState.EMPTY, StoredMessageState.CACHED_NEW_MESSAGES_AVAILABLE);
|
||||
processStoredMessages();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleNewEphemeralMessageAvailable() {
|
||||
ephemeralMessageAvailableMeter.mark();
|
||||
|
||||
final Optional<Envelope> maybeMessage = messagesManager.takeEphemeralMessage(account.getUuid(), device.getId());
|
||||
|
||||
if (maybeMessage.isPresent()) {
|
||||
sendMessage(maybeMessage.get(), Optional.empty(), false);
|
||||
}
|
||||
messagesManager.takeEphemeralMessage(account.getUuid(), device.getId())
|
||||
.ifPresent(message -> sendMessage(message, Optional.empty()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleMessagesPersisted() {
|
||||
messagesPersistedMeter.mark();
|
||||
|
||||
storedMessageState.set(StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE);
|
||||
processStoredMessages();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user