diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index 2d755891f..0163d20de 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -377,12 +377,12 @@ public class WebSocketConnection implements MessageAvailabilityListener, Disconn } }) .flatMapSequential(envelope -> - Mono.defer(() -> Mono.fromFuture(() -> sendMessage(envelope)).timeout(sendFuturesTimeout)) - .doOnError(this::measureSendMessageErrors) + Mono.fromFuture(() -> sendMessage(envelope)).timeout(sendFuturesTimeout) // Note that this will retry both for "send to client" timeouts and failures to delete messages on // acknowledgement .retryWhen(Retry.backoff(4, Duration.ofSeconds(1))), MESSAGE_SENDER_MAX_CONCURRENCY) + .doOnError(this::measureSendMessageErrors) .subscribeOn(messageDeliveryScheduler) .subscribe( // no additional consumer of values - it is Flux by now