diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index 6d9f8779e..fded9d827 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -219,39 +219,35 @@ public class WebSocketConnection implements DisconnectionRequestListener { .thenReturn(entry); case MessageStreamEntry.QueueEmpty _ -> Mono.just(entry); }, MESSAGE_SENDER_MAX_CONCURRENCY) - .subscribeOn(messageDeliveryScheduler) - .subscribe( - entry -> { - if (entry instanceof MessageStreamEntry.QueueEmpty) { - final Duration drainDuration = Duration.ofNanos(System.nanoTime() - queueDrainStartNanos); + // `ConflictingMessageConsumerException` is handled before processing messages + .doOnError(throwable -> !(throwable instanceof ConflictingMessageConsumerException), throwable -> { + measureSendMessageErrors(throwable); - Metrics.summary(INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME, platformTag).record(sentMessageCounter.sum()); - Metrics.timer(INITIAL_QUEUE_DRAIN_TIMER_NAME, platformTag).record(drainDuration); + if (!client.isOpen()) { + logger.debug("Client disconnected before queue cleared"); + return; + } - if (drainDuration.compareTo(SLOW_DRAIN_THRESHOLD) > 0) { - Metrics.counter(SLOW_QUEUE_DRAIN_COUNTER_NAME, platformTag).increment(); - } + client.close(1011, "Failed to retrieve messages"); + }) + // Make sure we process message acknowledgements before sending the "queue clear" signal + .doOnNext(entry -> { + if (entry instanceof MessageStreamEntry.QueueEmpty) { + final Duration drainDuration = Duration.ofNanos(System.nanoTime() - queueDrainStartNanos); - client.sendRequest("PUT", "/api/v1/queue/empty", - Collections.singletonList(HeaderUtils.getTimestampHeader()), Optional.empty()); - } - }, - throwable -> { - // `ConflictingMessageConsumerException` is handled before processing messages - if (throwable instanceof ConflictingMessageConsumerException) { - return; - } + Metrics.summary(INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME, platformTag).record(sentMessageCounter.sum()); + Metrics.timer(INITIAL_QUEUE_DRAIN_TIMER_NAME, platformTag).record(drainDuration); - measureSendMessageErrors(throwable); - - if (!client.isOpen()) { - logger.debug("Client disconnected before queue cleared"); - return; - } - - client.close(1011, "Failed to retrieve messages"); + if (drainDuration.compareTo(SLOW_DRAIN_THRESHOLD) > 0) { + Metrics.counter(SLOW_QUEUE_DRAIN_COUNTER_NAME, platformTag).increment(); } - )); + + client.sendRequest("PUT", "/api/v1/queue/empty", + Collections.singletonList(HeaderUtils.getTimestampHeader()), Optional.empty()); + } + }) + .subscribeOn(messageDeliveryScheduler) + .subscribe()); } public void stop() {