diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index feb2ac24c..3665cd2c3 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -219,35 +219,39 @@ public class WebSocketConnection implements DisconnectionRequestListener { .thenReturn(entry); case MessageStreamEntry.QueueEmpty _ -> Mono.just(entry); }, MESSAGE_SENDER_MAX_CONCURRENCY) - // `ConflictingMessageConsumerException` is handled before processing messages - .doOnError(throwable -> !(throwable instanceof ConflictingMessageConsumerException), throwable -> { - measureSendMessageErrors(throwable); - - if (!client.isOpen()) { - logger.debug("Client disconnected before queue cleared"); - return; - } - - client.close(1011, "Failed to retrieve messages"); - }) - // Make sure we process message acknowledgements before sending the "queue clear" signal - .doOnNext(entry -> { - if (entry instanceof MessageStreamEntry.QueueEmpty) { - final Duration drainDuration = Duration.ofNanos(System.nanoTime() - queueDrainStartNanos); - - Metrics.summary(INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME, platformTag).record(sentMessageCounter.sum()); - Metrics.timer(INITIAL_QUEUE_DRAIN_TIMER_NAME, platformTag).record(drainDuration); - - if (drainDuration.compareTo(SLOW_DRAIN_THRESHOLD) > 0) { - Metrics.counter(SLOW_QUEUE_DRAIN_COUNTER_NAME, platformTag).increment(); - } - - client.sendRequest("PUT", "/api/v1/queue/empty", - Collections.singletonList(HeaderUtils.getTimestampHeader()), Optional.empty()); - } - }) .subscribeOn(messageDeliveryScheduler) - .subscribe()); + .subscribe( + entry -> { + if (entry instanceof MessageStreamEntry.QueueEmpty) { + final Duration drainDuration = Duration.ofNanos(System.nanoTime() - queueDrainStartNanos); + + Metrics.summary(INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME, platformTag).record(sentMessageCounter.sum()); + Metrics.timer(INITIAL_QUEUE_DRAIN_TIMER_NAME, platformTag).record(drainDuration); + + if (drainDuration.compareTo(SLOW_DRAIN_THRESHOLD) > 0) { + Metrics.counter(SLOW_QUEUE_DRAIN_COUNTER_NAME, platformTag).increment(); + } + + client.sendRequest("PUT", "/api/v1/queue/empty", + Collections.singletonList(HeaderUtils.getTimestampHeader()), Optional.empty()); + } + }, + throwable -> { + // `ConflictingMessageConsumerException` is handled before processing messages + if (throwable instanceof ConflictingMessageConsumerException) { + return; + } + + measureSendMessageErrors(throwable); + + if (!client.isOpen()) { + logger.debug("Client disconnected before queue cleared"); + return; + } + + client.close(1011, "Failed to retrieve messages"); + } + )); } public void stop() {