From 4d521cea4285a35346dd360e4543ccbda3f28616 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Wed, 13 Aug 2025 15:46:50 -0400 Subject: [PATCH] Revert "Move error/entry handling to `subscribe` handlers" This reverts commit 7d10209198d12e5426d526e2c3e79ee6137218d7. --- .../websocket/WebSocketConnection.java | 52 +++++++++---------- 1 file changed, 24 insertions(+), 28 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index 6d9f8779e..fded9d827 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -219,39 +219,35 @@ public class WebSocketConnection implements DisconnectionRequestListener { .thenReturn(entry); case MessageStreamEntry.QueueEmpty _ -> Mono.just(entry); }, MESSAGE_SENDER_MAX_CONCURRENCY) - .subscribeOn(messageDeliveryScheduler) - .subscribe( - entry -> { - if (entry instanceof MessageStreamEntry.QueueEmpty) { - final Duration drainDuration = Duration.ofNanos(System.nanoTime() - queueDrainStartNanos); + // `ConflictingMessageConsumerException` is handled before processing messages + .doOnError(throwable -> !(throwable instanceof ConflictingMessageConsumerException), throwable -> { + measureSendMessageErrors(throwable); - Metrics.summary(INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME, platformTag).record(sentMessageCounter.sum()); - Metrics.timer(INITIAL_QUEUE_DRAIN_TIMER_NAME, platformTag).record(drainDuration); + if (!client.isOpen()) { + logger.debug("Client disconnected before queue cleared"); + return; + } - if (drainDuration.compareTo(SLOW_DRAIN_THRESHOLD) > 0) { - Metrics.counter(SLOW_QUEUE_DRAIN_COUNTER_NAME, platformTag).increment(); - } + client.close(1011, "Failed to retrieve messages"); + }) + // Make sure we process message acknowledgements before sending the "queue clear" signal + .doOnNext(entry -> { + if (entry instanceof MessageStreamEntry.QueueEmpty) { + final Duration drainDuration = Duration.ofNanos(System.nanoTime() - queueDrainStartNanos); - client.sendRequest("PUT", "/api/v1/queue/empty", - Collections.singletonList(HeaderUtils.getTimestampHeader()), Optional.empty()); - } - }, - throwable -> { - // `ConflictingMessageConsumerException` is handled before processing messages - if (throwable instanceof ConflictingMessageConsumerException) { - return; - } + Metrics.summary(INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME, platformTag).record(sentMessageCounter.sum()); + Metrics.timer(INITIAL_QUEUE_DRAIN_TIMER_NAME, platformTag).record(drainDuration); - measureSendMessageErrors(throwable); - - if (!client.isOpen()) { - logger.debug("Client disconnected before queue cleared"); - return; - } - - client.close(1011, "Failed to retrieve messages"); + if (drainDuration.compareTo(SLOW_DRAIN_THRESHOLD) > 0) { + Metrics.counter(SLOW_QUEUE_DRAIN_COUNTER_NAME, platformTag).increment(); } - )); + + client.sendRequest("PUT", "/api/v1/queue/empty", + Collections.singletonList(HeaderUtils.getTimestampHeader()), Optional.empty()); + } + }) + .subscribeOn(messageDeliveryScheduler) + .subscribe()); } public void stop() {