mirror of
https://github.com/signalapp/Signal-Server
synced 2026-04-20 19:28:10 +01:00
Use reactive streams for WebSocket message queue
Initially, uses `ExperimentEnrollmentManager` to do a safe rollout.
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2013-2021 Signal Messenger, LLC
|
||||
* Copyright 2013-2022 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
@@ -11,14 +11,19 @@ import com.codahale.metrics.Counter;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.codahale.metrics.Timer;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import io.micrometer.core.instrument.Tags;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import org.eclipse.jetty.websocket.api.UpgradeResponse;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount;
|
||||
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
|
||||
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
|
||||
import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
|
||||
import org.whispersystems.textsecuregcm.push.NotPushRegisteredException;
|
||||
import org.whispersystems.textsecuregcm.push.PushNotificationManager;
|
||||
@@ -32,32 +37,48 @@ import org.whispersystems.websocket.setup.WebSocketConnectListener;
|
||||
|
||||
public class AuthenticatedConnectListener implements WebSocketConnectListener {
|
||||
|
||||
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
private static final Timer durationTimer = metricRegistry.timer(name(WebSocketConnection.class, "connected_duration" ));
|
||||
private static final Timer unauthenticatedDurationTimer = metricRegistry.timer(name(WebSocketConnection.class, "unauthenticated_connection_duration"));
|
||||
private static final Counter openWebsocketCounter = metricRegistry.counter(name(WebSocketConnection.class, "open_websockets"));
|
||||
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
private static final Timer durationTimer = metricRegistry.timer(
|
||||
name(WebSocketConnection.class, "connected_duration"));
|
||||
private static final Timer unauthenticatedDurationTimer = metricRegistry.timer(
|
||||
name(WebSocketConnection.class, "unauthenticated_connection_duration"));
|
||||
private static final Counter openWebsocketCounter = metricRegistry.counter(
|
||||
name(WebSocketConnection.class, "open_websockets"));
|
||||
|
||||
private static final String OPEN_WEBSOCKET_COUNTER_NAME = MetricsUtil.name(WebSocketConnection.class,
|
||||
"openWebsockets");
|
||||
|
||||
private static final long RENEW_PRESENCE_INTERVAL_MINUTES = 5;
|
||||
|
||||
private static final String REACTIVE_MESSAGE_QUEUE_EXPERIMENT_NAME = "reactive_message_queue_v1";
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(AuthenticatedConnectListener.class);
|
||||
|
||||
private final ReceiptSender receiptSender;
|
||||
private final MessagesManager messagesManager;
|
||||
private final ReceiptSender receiptSender;
|
||||
private final MessagesManager messagesManager;
|
||||
private final PushNotificationManager pushNotificationManager;
|
||||
private final ClientPresenceManager clientPresenceManager;
|
||||
private final ScheduledExecutorService scheduledExecutorService;
|
||||
private final ExperimentEnrollmentManager experimentEnrollmentManager;
|
||||
|
||||
private final AtomicInteger openReactiveWebSockets = new AtomicInteger(0);
|
||||
private final AtomicInteger openStandardWebSockets = new AtomicInteger(0);
|
||||
|
||||
public AuthenticatedConnectListener(ReceiptSender receiptSender,
|
||||
MessagesManager messagesManager,
|
||||
PushNotificationManager pushNotificationManager,
|
||||
ClientPresenceManager clientPresenceManager,
|
||||
ScheduledExecutorService scheduledExecutorService)
|
||||
{
|
||||
this.receiptSender = receiptSender;
|
||||
this.messagesManager = messagesManager;
|
||||
ScheduledExecutorService scheduledExecutorService,
|
||||
ExperimentEnrollmentManager experimentEnrollmentManager) {
|
||||
this.receiptSender = receiptSender;
|
||||
this.messagesManager = messagesManager;
|
||||
this.pushNotificationManager = pushNotificationManager;
|
||||
this.clientPresenceManager = clientPresenceManager;
|
||||
this.scheduledExecutorService = scheduledExecutorService;
|
||||
this.experimentEnrollmentManager = experimentEnrollmentManager;
|
||||
|
||||
Metrics.gauge(OPEN_WEBSOCKET_COUNTER_NAME, Tags.of("reactive", String.valueOf(true)), openReactiveWebSockets);
|
||||
Metrics.gauge(OPEN_WEBSOCKET_COUNTER_NAME, Tags.of("reactive", String.valueOf(false)), openStandardWebSockets);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -66,43 +87,56 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
|
||||
final AuthenticatedAccount auth = context.getAuthenticated(AuthenticatedAccount.class);
|
||||
final Device device = auth.getAuthenticatedDevice();
|
||||
final Timer.Context timer = durationTimer.time();
|
||||
final boolean enrolledInReactiveMessageQueue = experimentEnrollmentManager.isEnrolled(
|
||||
auth.getAccount().getUuid(),
|
||||
REACTIVE_MESSAGE_QUEUE_EXPERIMENT_NAME);
|
||||
final WebSocketConnection connection = new WebSocketConnection(receiptSender,
|
||||
messagesManager, auth, device,
|
||||
context.getClient(),
|
||||
scheduledExecutorService);
|
||||
scheduledExecutorService,
|
||||
enrolledInReactiveMessageQueue);
|
||||
|
||||
openWebsocketCounter.inc();
|
||||
if (enrolledInReactiveMessageQueue) {
|
||||
openReactiveWebSockets.incrementAndGet();
|
||||
} else {
|
||||
openStandardWebSockets.incrementAndGet();
|
||||
}
|
||||
|
||||
pushNotificationManager.handleMessagesRetrieved(auth.getAccount(), device, context.getClient().getUserAgent());
|
||||
|
||||
final AtomicReference<ScheduledFuture<?>> renewPresenceFutureReference = new AtomicReference<>();
|
||||
|
||||
context.addListener(new WebSocketSessionContext.WebSocketEventListener() {
|
||||
@Override
|
||||
public void onWebSocketClose(WebSocketSessionContext context, int statusCode, String reason) {
|
||||
openWebsocketCounter.dec();
|
||||
timer.stop();
|
||||
|
||||
final ScheduledFuture<?> renewPresenceFuture = renewPresenceFutureReference.get();
|
||||
|
||||
if (renewPresenceFuture != null) {
|
||||
renewPresenceFuture.cancel(false);
|
||||
}
|
||||
|
||||
connection.stop();
|
||||
|
||||
RedisOperation.unchecked(
|
||||
() -> clientPresenceManager.clearPresence(auth.getAccount().getUuid(), device.getId()));
|
||||
RedisOperation.unchecked(() -> {
|
||||
messagesManager.removeMessageAvailabilityListener(connection);
|
||||
|
||||
if (messagesManager.hasCachedMessages(auth.getAccount().getUuid(), device.getId())) {
|
||||
try {
|
||||
pushNotificationManager.sendNewMessageNotification(auth.getAccount(), device.getId(), true);
|
||||
} catch (NotPushRegisteredException ignored) {
|
||||
}
|
||||
}
|
||||
});
|
||||
context.addListener((closingContext, statusCode, reason) -> {
|
||||
openWebsocketCounter.dec();
|
||||
if (enrolledInReactiveMessageQueue) {
|
||||
openReactiveWebSockets.decrementAndGet();
|
||||
} else {
|
||||
openStandardWebSockets.decrementAndGet();
|
||||
}
|
||||
|
||||
timer.stop();
|
||||
|
||||
final ScheduledFuture<?> renewPresenceFuture = renewPresenceFutureReference.get();
|
||||
|
||||
if (renewPresenceFuture != null) {
|
||||
renewPresenceFuture.cancel(false);
|
||||
}
|
||||
|
||||
connection.stop();
|
||||
|
||||
RedisOperation.unchecked(
|
||||
() -> clientPresenceManager.clearPresence(auth.getAccount().getUuid(), device.getId()));
|
||||
RedisOperation.unchecked(() -> {
|
||||
messagesManager.removeMessageAvailabilityListener(connection);
|
||||
|
||||
if (messagesManager.hasCachedMessages(auth.getAccount().getUuid(), device.getId())) {
|
||||
try {
|
||||
pushNotificationManager.sendNewMessageNotification(auth.getAccount(), device.getId(), true);
|
||||
} catch (NotPushRegisteredException ignored) {
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
try {
|
||||
|
||||
@@ -1,12 +1,11 @@
|
||||
/*
|
||||
* Copyright 2013-2021 Signal Messenger, LLC
|
||||
* Copyright 2013-2022 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.whispersystems.textsecuregcm.websocket;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
|
||||
|
||||
import com.codahale.metrics.Histogram;
|
||||
import com.codahale.metrics.Meter;
|
||||
@@ -34,11 +33,13 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
import javax.ws.rs.WebApplicationException;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.reactivestreams.Publisher;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount;
|
||||
import org.whispersystems.textsecuregcm.controllers.MessageController;
|
||||
import org.whispersystems.textsecuregcm.controllers.NoSuchUserException;
|
||||
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
|
||||
import org.whispersystems.textsecuregcm.metrics.MessageMetrics;
|
||||
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
|
||||
import org.whispersystems.textsecuregcm.push.DisplacedPresenceListener;
|
||||
@@ -49,13 +50,14 @@ import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
||||
import org.whispersystems.textsecuregcm.util.Constants;
|
||||
import org.whispersystems.textsecuregcm.util.Pair;
|
||||
import org.whispersystems.textsecuregcm.util.TimestampHeaderUtil;
|
||||
import org.whispersystems.textsecuregcm.util.ua.ClientPlatform;
|
||||
import org.whispersystems.textsecuregcm.util.ua.UnrecognizedUserAgentException;
|
||||
import org.whispersystems.textsecuregcm.util.ua.UserAgentUtil;
|
||||
import org.whispersystems.websocket.WebSocketClient;
|
||||
import org.whispersystems.websocket.messages.WebSocketResponseMessage;
|
||||
import reactor.core.Disposable;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.scheduler.Scheduler;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
|
||||
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
|
||||
public class WebSocketConnection implements MessageAvailabilityListener, DisplacedPresenceListener {
|
||||
|
||||
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
@@ -70,8 +72,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
|
||||
name(WebSocketConnection.class, "messagesPersisted"));
|
||||
private static final Meter bytesSentMeter = metricRegistry.meter(name(WebSocketConnection.class, "bytes_sent"));
|
||||
private static final Meter sendFailuresMeter = metricRegistry.meter(name(WebSocketConnection.class, "send_failures"));
|
||||
private static final Meter discardedMessagesMeter = metricRegistry.meter(
|
||||
name(WebSocketConnection.class, "discardedMessages"));
|
||||
|
||||
private static final String INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME = name(WebSocketConnection.class,
|
||||
"initialQueueLength");
|
||||
@@ -85,11 +85,12 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
|
||||
"messageAvailableAfterClientClosed");
|
||||
private static final String STATUS_CODE_TAG = "status";
|
||||
private static final String STATUS_MESSAGE_TAG = "message";
|
||||
private static final String REACTIVE_TAG = "reactive";
|
||||
|
||||
private static final long SLOW_DRAIN_THRESHOLD = 10_000;
|
||||
|
||||
@VisibleForTesting
|
||||
static final int MAX_DESKTOP_MESSAGE_SIZE = 1024 * 1024;
|
||||
static final int MESSAGE_PUBLISHER_LIMIT_RATE = 100;
|
||||
|
||||
@VisibleForTesting
|
||||
static final int MAX_CONSECUTIVE_RETRIES = 5;
|
||||
@@ -111,18 +112,19 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
|
||||
|
||||
private final ScheduledExecutorService scheduledExecutorService;
|
||||
|
||||
private final boolean isDesktopClient;
|
||||
|
||||
private final Semaphore processStoredMessagesSemaphore = new Semaphore(1);
|
||||
private final AtomicReference<StoredMessageState> storedMessageState = new AtomicReference<>(
|
||||
StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE);
|
||||
private final AtomicBoolean sentInitialQueueEmptyMessage = new AtomicBoolean(false);
|
||||
private final LongAdder sentMessageCounter = new LongAdder();
|
||||
private final AtomicLong queueDrainStartTime = new AtomicLong();
|
||||
private final AtomicInteger consecutiveRetries = new AtomicInteger();
|
||||
private final AtomicReference<ScheduledFuture<?>> retryFuture = new AtomicReference<>();
|
||||
private final AtomicInteger consecutiveRetries = new AtomicInteger();
|
||||
private final AtomicReference<ScheduledFuture<?>> retryFuture = new AtomicReference<>();
|
||||
private final AtomicReference<Disposable> messageSubscription = new AtomicReference<>();
|
||||
|
||||
private final Random random = new Random();
|
||||
private final boolean useReactive;
|
||||
private Scheduler reactiveScheduler;
|
||||
|
||||
private enum StoredMessageState {
|
||||
EMPTY,
|
||||
@@ -135,7 +137,28 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
|
||||
AuthenticatedAccount auth,
|
||||
Device device,
|
||||
WebSocketClient client,
|
||||
ScheduledExecutorService scheduledExecutorService) {
|
||||
ScheduledExecutorService scheduledExecutorService,
|
||||
boolean useReactive) {
|
||||
|
||||
this(receiptSender,
|
||||
messagesManager,
|
||||
auth,
|
||||
device,
|
||||
client,
|
||||
scheduledExecutorService,
|
||||
useReactive,
|
||||
Schedulers.boundedElastic());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
WebSocketConnection(ReceiptSender receiptSender,
|
||||
MessagesManager messagesManager,
|
||||
AuthenticatedAccount auth,
|
||||
Device device,
|
||||
WebSocketClient client,
|
||||
ScheduledExecutorService scheduledExecutorService,
|
||||
boolean useReactive,
|
||||
Scheduler reactiveScheduler) {
|
||||
|
||||
this(receiptSender,
|
||||
messagesManager,
|
||||
@@ -143,7 +166,9 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
|
||||
device,
|
||||
client,
|
||||
DEFAULT_SEND_FUTURES_TIMEOUT_MILLIS,
|
||||
scheduledExecutorService);
|
||||
scheduledExecutorService,
|
||||
useReactive,
|
||||
reactiveScheduler);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
@@ -153,7 +178,9 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
|
||||
Device device,
|
||||
WebSocketClient client,
|
||||
int sendFuturesTimeoutMillis,
|
||||
ScheduledExecutorService scheduledExecutorService) {
|
||||
ScheduledExecutorService scheduledExecutorService,
|
||||
boolean useReactive,
|
||||
Scheduler reactiveScheduler) {
|
||||
|
||||
this.receiptSender = receiptSender;
|
||||
this.messagesManager = messagesManager;
|
||||
@@ -162,16 +189,8 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
|
||||
this.client = client;
|
||||
this.sendFuturesTimeoutMillis = sendFuturesTimeoutMillis;
|
||||
this.scheduledExecutorService = scheduledExecutorService;
|
||||
|
||||
Optional<ClientPlatform> maybePlatform;
|
||||
|
||||
try {
|
||||
maybePlatform = Optional.of(UserAgentUtil.parseUserAgentString(client.getUserAgent()).getPlatform());
|
||||
} catch (final UnrecognizedUserAgentException e) {
|
||||
maybePlatform = Optional.empty();
|
||||
}
|
||||
|
||||
this.isDesktopClient = maybePlatform.map(platform -> platform == ClientPlatform.DESKTOP).orElse(false);
|
||||
this.useReactive = useReactive;
|
||||
this.reactiveScheduler = reactiveScheduler;
|
||||
}
|
||||
|
||||
public void start() {
|
||||
@@ -186,10 +205,15 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
|
||||
future.cancel(false);
|
||||
}
|
||||
|
||||
final Disposable subscription = messageSubscription.get();
|
||||
if (subscription != null) {
|
||||
subscription.dispose();
|
||||
}
|
||||
|
||||
client.close(1000, "OK");
|
||||
}
|
||||
|
||||
private CompletableFuture<WebSocketResponseMessage> sendMessage(final Envelope message, final Optional<StoredMessageInfo> storedMessageInfo) {
|
||||
private CompletableFuture<?> sendMessage(final Envelope message, StoredMessageInfo storedMessageInfo) {
|
||||
// clear ephemeral field from the envelope
|
||||
final Optional<byte[]> body = Optional.ofNullable(message.toBuilder().clearEphemeral().build().toByteArray());
|
||||
|
||||
@@ -199,33 +223,43 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
|
||||
MessageMetrics.measureAccountEnvelopeUuidMismatches(auth.getAccount(), message);
|
||||
|
||||
// X-Signal-Key: false must be sent until Android stops assuming it missing means true
|
||||
return client.sendRequest("PUT", "/api/v1/message", List.of("X-Signal-Key: false", TimestampHeaderUtil.getTimestampHeader()), body).whenComplete((response, throwable) -> {
|
||||
if (throwable == null) {
|
||||
if (isSuccessResponse(response)) {
|
||||
if (storedMessageInfo.isPresent()) {
|
||||
messagesManager.delete(auth.getAccount().getUuid(), device.getId(), storedMessageInfo.get().getGuid(), storedMessageInfo.get().getServerTimestamp());
|
||||
}
|
||||
|
||||
if (message.getType() != Envelope.Type.SERVER_DELIVERY_RECEIPT) {
|
||||
recordMessageDeliveryDuration(message.getTimestamp(), device);
|
||||
sendDeliveryReceiptFor(message);
|
||||
}
|
||||
} else {
|
||||
final List<Tag> tags = new ArrayList<>(
|
||||
List.of(Tag.of(STATUS_CODE_TAG, String.valueOf(response.getStatus())),
|
||||
UserAgentTagUtil.getPlatformTag(client.getUserAgent())));
|
||||
|
||||
// TODO Remove this once we've identified the cause of message rejections from desktop clients
|
||||
if (StringUtils.isNotBlank(response.getMessage())) {
|
||||
tags.add(Tag.of(STATUS_MESSAGE_TAG, response.getMessage()));
|
||||
return client.sendRequest("PUT", "/api/v1/message",
|
||||
List.of("X-Signal-Key: false", TimestampHeaderUtil.getTimestampHeader()), body)
|
||||
.whenComplete((ignored, throwable) -> {
|
||||
if (throwable != null) {
|
||||
sendFailuresMeter.mark();
|
||||
}
|
||||
}).thenCompose(response -> {
|
||||
final CompletableFuture<?> result;
|
||||
if (isSuccessResponse(response)) {
|
||||
|
||||
Metrics.counter(NON_SUCCESS_RESPONSE_COUNTER_NAME, tags).increment();
|
||||
}
|
||||
} else {
|
||||
sendFailuresMeter.mark();
|
||||
}
|
||||
});
|
||||
result = messagesManager.delete(auth.getAccount().getUuid(), device.getId(),
|
||||
storedMessageInfo.guid(), storedMessageInfo.serverTimestamp());
|
||||
|
||||
if (message.getType() != Envelope.Type.SERVER_DELIVERY_RECEIPT) {
|
||||
recordMessageDeliveryDuration(message.getTimestamp(), device);
|
||||
sendDeliveryReceiptFor(message);
|
||||
}
|
||||
} else {
|
||||
final List<Tag> tags = new ArrayList<>(
|
||||
List.of(
|
||||
Tag.of(STATUS_CODE_TAG, String.valueOf(response.getStatus())),
|
||||
UserAgentTagUtil.getPlatformTag(client.getUserAgent()),
|
||||
Tag.of(REACTIVE_TAG, String.valueOf(useReactive))
|
||||
));
|
||||
|
||||
// TODO Remove this once we've identified the cause of message rejections from desktop clients
|
||||
if (StringUtils.isNotBlank(response.getMessage())) {
|
||||
tags.add(Tag.of(STATUS_MESSAGE_TAG, response.getMessage()));
|
||||
}
|
||||
|
||||
Metrics.counter(NON_SUCCESS_RESPONSE_COUNTER_NAME, tags).increment();
|
||||
|
||||
result = CompletableFuture.completedFuture(null);
|
||||
}
|
||||
|
||||
return result;
|
||||
});
|
||||
}
|
||||
|
||||
public static void recordMessageDeliveryDuration(long timestamp, Device messageDestinationDevice) {
|
||||
@@ -260,65 +294,96 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
|
||||
|
||||
@VisibleForTesting
|
||||
void processStoredMessages() {
|
||||
if (processStoredMessagesSemaphore.tryAcquire()) {
|
||||
final StoredMessageState state = storedMessageState.getAndSet(StoredMessageState.EMPTY);
|
||||
final CompletableFuture<Void> queueClearedFuture = new CompletableFuture<>();
|
||||
|
||||
sendNextMessagePage(state != StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE, queueClearedFuture);
|
||||
|
||||
queueClearedFuture.whenComplete((v, cause) -> {
|
||||
if (cause == null) {
|
||||
consecutiveRetries.set(0);
|
||||
|
||||
if (sentInitialQueueEmptyMessage.compareAndSet(false, true)) {
|
||||
final List<Tag> tags = List.of(UserAgentTagUtil.getPlatformTag(client.getUserAgent()));
|
||||
final long drainDuration = System.currentTimeMillis() - queueDrainStartTime.get();
|
||||
|
||||
Metrics.summary(INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME, tags).record(sentMessageCounter.sum());
|
||||
Metrics.timer(INITIAL_QUEUE_DRAIN_TIMER_NAME, tags).record(drainDuration, TimeUnit.MILLISECONDS);
|
||||
|
||||
if (drainDuration > SLOW_DRAIN_THRESHOLD) {
|
||||
Metrics.counter(SLOW_QUEUE_DRAIN_COUNTER_NAME, tags).increment();
|
||||
}
|
||||
|
||||
client.sendRequest("PUT", "/api/v1/queue/empty",
|
||||
Collections.singletonList(TimestampHeaderUtil.getTimestampHeader()), Optional.empty());
|
||||
}
|
||||
} else {
|
||||
storedMessageState.compareAndSet(StoredMessageState.EMPTY, state);
|
||||
}
|
||||
|
||||
processStoredMessagesSemaphore.release();
|
||||
|
||||
if (cause == null) {
|
||||
if (storedMessageState.get() != StoredMessageState.EMPTY) {
|
||||
processStoredMessages();
|
||||
}
|
||||
} else {
|
||||
if (client.isOpen()) {
|
||||
|
||||
if (consecutiveRetries.incrementAndGet() > MAX_CONSECUTIVE_RETRIES) {
|
||||
logger.warn("Max consecutive retries exceeded", cause);
|
||||
client.close(1011, "Failed to retrieve messages");
|
||||
} else {
|
||||
logger.debug("Failed to clear queue", cause);
|
||||
final List<Tag> tags = List.of(UserAgentTagUtil.getPlatformTag(client.getUserAgent()));
|
||||
|
||||
Metrics.counter(QUEUE_DRAIN_RETRY_COUNTER_NAME, tags).increment();
|
||||
|
||||
final long delay = RETRY_DELAY_MILLIS + random.nextInt(RETRY_DELAY_JITTER_MILLIS);
|
||||
retryFuture
|
||||
.set(scheduledExecutorService.schedule(this::processStoredMessages, delay, TimeUnit.MILLISECONDS));
|
||||
}
|
||||
} else {
|
||||
logger.debug("Client disconnected before queue cleared");
|
||||
}
|
||||
}
|
||||
});
|
||||
if (useReactive) {
|
||||
processStoredMessages_reactive();
|
||||
} else {
|
||||
processStoredMessage_paged();
|
||||
}
|
||||
}
|
||||
|
||||
private void sendNextMessagePage(final boolean cachedMessagesOnly, final CompletableFuture<Void> queueClearedFuture) {
|
||||
private void processStoredMessage_paged() {
|
||||
assert !useReactive;
|
||||
|
||||
if (processStoredMessagesSemaphore.tryAcquire()) {
|
||||
final StoredMessageState state = storedMessageState.getAndSet(StoredMessageState.EMPTY);
|
||||
final CompletableFuture<Void> queueCleared = new CompletableFuture<>();
|
||||
|
||||
sendNextMessagePage(state != StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE, queueCleared);
|
||||
|
||||
setQueueClearedHandler(state, queueCleared);
|
||||
}
|
||||
}
|
||||
|
||||
private void setQueueClearedHandler(final StoredMessageState state, final CompletableFuture<Void> queueCleared) {
|
||||
|
||||
queueCleared.whenComplete((v, cause) -> {
|
||||
if (cause == null) {
|
||||
consecutiveRetries.set(0);
|
||||
|
||||
if (sentInitialQueueEmptyMessage.compareAndSet(false, true)) {
|
||||
final List<Tag> tags = List.of(
|
||||
UserAgentTagUtil.getPlatformTag(client.getUserAgent()),
|
||||
Tag.of(REACTIVE_TAG, String.valueOf(useReactive))
|
||||
);
|
||||
final long drainDuration = System.currentTimeMillis() - queueDrainStartTime.get();
|
||||
|
||||
Metrics.summary(INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME, tags).record(sentMessageCounter.sum());
|
||||
Metrics.timer(INITIAL_QUEUE_DRAIN_TIMER_NAME, tags).record(drainDuration, TimeUnit.MILLISECONDS);
|
||||
|
||||
if (drainDuration > SLOW_DRAIN_THRESHOLD) {
|
||||
Metrics.counter(SLOW_QUEUE_DRAIN_COUNTER_NAME, tags).increment();
|
||||
}
|
||||
|
||||
client.sendRequest("PUT", "/api/v1/queue/empty",
|
||||
Collections.singletonList(TimestampHeaderUtil.getTimestampHeader()), Optional.empty());
|
||||
}
|
||||
} else {
|
||||
storedMessageState.compareAndSet(StoredMessageState.EMPTY, state);
|
||||
}
|
||||
|
||||
processStoredMessagesSemaphore.release();
|
||||
|
||||
if (cause == null) {
|
||||
if (storedMessageState.get() != StoredMessageState.EMPTY) {
|
||||
processStoredMessages();
|
||||
}
|
||||
} else {
|
||||
if (client.isOpen()) {
|
||||
|
||||
if (consecutiveRetries.incrementAndGet() > MAX_CONSECUTIVE_RETRIES) {
|
||||
logger.warn("Max consecutive retries exceeded", cause);
|
||||
client.close(1011, "Failed to retrieve messages");
|
||||
} else {
|
||||
logger.debug("Failed to clear queue", cause);
|
||||
final List<Tag> tags = List.of(UserAgentTagUtil.getPlatformTag(client.getUserAgent()));
|
||||
|
||||
Metrics.counter(QUEUE_DRAIN_RETRY_COUNTER_NAME, tags).increment();
|
||||
|
||||
final long delay = RETRY_DELAY_MILLIS + random.nextInt(RETRY_DELAY_JITTER_MILLIS);
|
||||
retryFuture
|
||||
.set(scheduledExecutorService.schedule(this::processStoredMessages, delay, TimeUnit.MILLISECONDS));
|
||||
}
|
||||
} else {
|
||||
logger.debug("Client disconnected before queue cleared");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void processStoredMessages_reactive() {
|
||||
assert useReactive;
|
||||
|
||||
if (processStoredMessagesSemaphore.tryAcquire()) {
|
||||
final StoredMessageState state = storedMessageState.getAndSet(StoredMessageState.EMPTY);
|
||||
final CompletableFuture<Void> queueCleared = new CompletableFuture<>();
|
||||
|
||||
sendMessagesReactive(state != StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE, queueCleared);
|
||||
|
||||
setQueueClearedHandler(state, queueCleared);
|
||||
}
|
||||
}
|
||||
|
||||
private void sendNextMessagePage(final boolean cachedMessagesOnly, final CompletableFuture<Void> queueCleared) {
|
||||
try {
|
||||
final Pair<List<Envelope>, Boolean> messagesAndHasMore = messagesManager.getMessagesForDevice(
|
||||
auth.getAccount().getUuid(), device.getId(), cachedMessagesOnly);
|
||||
@@ -330,25 +395,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
|
||||
|
||||
for (int i = 0; i < messages.size(); i++) {
|
||||
final Envelope envelope = messages.get(i);
|
||||
final UUID messageGuid = UUID.fromString(envelope.getServerGuid());
|
||||
|
||||
final boolean discard;
|
||||
if (isDesktopClient && envelope.getSerializedSize() > MAX_DESKTOP_MESSAGE_SIZE) {
|
||||
discard = true;
|
||||
} else if (envelope.getStory() && !client.shouldDeliverStories()) {
|
||||
discard = true;
|
||||
} else {
|
||||
discard = false;
|
||||
}
|
||||
|
||||
if (discard) {
|
||||
messagesManager.delete(auth.getAccount().getUuid(), device.getId(), messageGuid, envelope.getServerTimestamp());
|
||||
discardedMessagesMeter.mark();
|
||||
|
||||
sendFutures[i] = CompletableFuture.completedFuture(null);
|
||||
} else {
|
||||
sendFutures[i] = sendMessage(envelope, Optional.of(new StoredMessageInfo(messageGuid, envelope.getServerTimestamp())));
|
||||
}
|
||||
sendFutures[i] = sendMessage(envelope);
|
||||
}
|
||||
|
||||
// Set a large, non-zero timeout, to prevent any failure to acknowledge receipt from blocking indefinitely
|
||||
@@ -357,16 +404,45 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
|
||||
.whenComplete((v, cause) -> {
|
||||
if (cause == null) {
|
||||
if (hasMore) {
|
||||
sendNextMessagePage(cachedMessagesOnly, queueClearedFuture);
|
||||
sendNextMessagePage(cachedMessagesOnly, queueCleared);
|
||||
} else {
|
||||
queueClearedFuture.complete(null);
|
||||
queueCleared.complete(null);
|
||||
}
|
||||
} else {
|
||||
queueClearedFuture.completeExceptionally(cause);
|
||||
queueCleared.completeExceptionally(cause);
|
||||
}
|
||||
});
|
||||
} catch (final Exception e) {
|
||||
queueClearedFuture.completeExceptionally(e);
|
||||
queueCleared.completeExceptionally(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void sendMessagesReactive(final boolean cachedMessagesOnly, final CompletableFuture<Void> queueCleared) {
|
||||
|
||||
final Publisher<Envelope> messages =
|
||||
messagesManager.getMessagesForDeviceReactive(auth.getAccount().getUuid(), device.getId(), cachedMessagesOnly);
|
||||
|
||||
final Disposable subscription = Flux.from(messages)
|
||||
.limitRate(MESSAGE_PUBLISHER_LIMIT_RATE)
|
||||
.flatMapSequential(envelope ->
|
||||
Mono.fromFuture(sendMessage(envelope).orTimeout(sendFuturesTimeoutMillis, TimeUnit.MILLISECONDS)))
|
||||
.doOnError(queueCleared::completeExceptionally)
|
||||
.doOnComplete(() -> queueCleared.complete(null))
|
||||
.subscribeOn(reactiveScheduler)
|
||||
.subscribe();
|
||||
|
||||
messageSubscription.set(subscription);
|
||||
}
|
||||
|
||||
private CompletableFuture<?> sendMessage(Envelope envelope) {
|
||||
final UUID messageGuid = UUID.fromString(envelope.getServerGuid());
|
||||
|
||||
if (envelope.getStory() && !client.shouldDeliverStories()) {
|
||||
messagesManager.delete(auth.getAccount().getUuid(), device.getId(), messageGuid, envelope.getServerTimestamp());
|
||||
|
||||
return CompletableFuture.completedFuture(null);
|
||||
} else {
|
||||
return sendMessage(envelope, new StoredMessageInfo(messageGuid, envelope.getServerTimestamp()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -381,6 +457,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
|
||||
messageAvailableMeter.mark();
|
||||
|
||||
storedMessageState.compareAndSet(StoredMessageState.EMPTY, StoredMessageState.CACHED_NEW_MESSAGES_AVAILABLE);
|
||||
|
||||
processStoredMessages();
|
||||
|
||||
return true;
|
||||
@@ -396,6 +473,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
|
||||
messagesPersistedMeter.mark();
|
||||
|
||||
storedMessageState.set(StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE);
|
||||
|
||||
processStoredMessages();
|
||||
|
||||
return true;
|
||||
@@ -405,7 +483,8 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
|
||||
public void handleDisplacement(final boolean connectedElsewhere) {
|
||||
final Tags tags = Tags.of(
|
||||
UserAgentTagUtil.getPlatformTag(client.getUserAgent()),
|
||||
Tag.of("connectedElsewhere", String.valueOf(connectedElsewhere)));
|
||||
Tag.of("connectedElsewhere", String.valueOf(connectedElsewhere)),
|
||||
Tag.of(REACTIVE_TAG, String.valueOf(useReactive)));
|
||||
|
||||
Metrics.counter(DISPLACEMENT_COUNTER_NAME, tags).increment();
|
||||
|
||||
@@ -429,21 +508,7 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
|
||||
}
|
||||
}
|
||||
|
||||
private static class StoredMessageInfo {
|
||||
private final UUID guid;
|
||||
private final long serverTimestamp;
|
||||
private record StoredMessageInfo(UUID guid, long serverTimestamp) {
|
||||
|
||||
public StoredMessageInfo(UUID guid, long serverTimestamp) {
|
||||
this.guid = guid;
|
||||
this.serverTimestamp = serverTimestamp;
|
||||
}
|
||||
|
||||
public UUID getGuid() {
|
||||
return guid;
|
||||
}
|
||||
|
||||
public long getServerTimestamp() {
|
||||
return serverTimestamp;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user