Measure only errors that terminate a message stream

This commit is contained in:
Jon Chambers
2025-07-31 14:38:21 -04:00
committed by Jon Chambers
parent b7e64e09a3
commit 4923b6da68

View File

@@ -377,12 +377,12 @@ public class WebSocketConnection implements MessageAvailabilityListener, Disconn
}
})
.flatMapSequential(envelope ->
Mono.defer(() -> Mono.fromFuture(() -> sendMessage(envelope)).timeout(sendFuturesTimeout))
.doOnError(this::measureSendMessageErrors)
Mono.fromFuture(() -> sendMessage(envelope)).timeout(sendFuturesTimeout)
// Note that this will retry both for "send to client" timeouts and failures to delete messages on
// acknowledgement
.retryWhen(Retry.backoff(4, Duration.ofSeconds(1))),
MESSAGE_SENDER_MAX_CONCURRENCY)
.doOnError(this::measureSendMessageErrors)
.subscribeOn(messageDeliveryScheduler)
.subscribe(
// no additional consumer of values - it is Flux<Void> by now