Revert "Move error/entry handling to subscribe handlers"

This reverts commit 7d10209198.
This commit is contained in:
Jon Chambers
2025-08-13 15:46:50 -04:00
committed by Jon Chambers
parent 7d10209198
commit 4d521cea42

View File

@@ -219,39 +219,35 @@ public class WebSocketConnection implements DisconnectionRequestListener {
.thenReturn(entry);
case MessageStreamEntry.QueueEmpty _ -> Mono.just(entry);
}, MESSAGE_SENDER_MAX_CONCURRENCY)
.subscribeOn(messageDeliveryScheduler)
.subscribe(
entry -> {
if (entry instanceof MessageStreamEntry.QueueEmpty) {
final Duration drainDuration = Duration.ofNanos(System.nanoTime() - queueDrainStartNanos);
// `ConflictingMessageConsumerException` is handled before processing messages
.doOnError(throwable -> !(throwable instanceof ConflictingMessageConsumerException), throwable -> {
measureSendMessageErrors(throwable);
Metrics.summary(INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME, platformTag).record(sentMessageCounter.sum());
Metrics.timer(INITIAL_QUEUE_DRAIN_TIMER_NAME, platformTag).record(drainDuration);
if (!client.isOpen()) {
logger.debug("Client disconnected before queue cleared");
return;
}
if (drainDuration.compareTo(SLOW_DRAIN_THRESHOLD) > 0) {
Metrics.counter(SLOW_QUEUE_DRAIN_COUNTER_NAME, platformTag).increment();
}
client.close(1011, "Failed to retrieve messages");
})
// Make sure we process message acknowledgements before sending the "queue clear" signal
.doOnNext(entry -> {
if (entry instanceof MessageStreamEntry.QueueEmpty) {
final Duration drainDuration = Duration.ofNanos(System.nanoTime() - queueDrainStartNanos);
client.sendRequest("PUT", "/api/v1/queue/empty",
Collections.singletonList(HeaderUtils.getTimestampHeader()), Optional.empty());
}
},
throwable -> {
// `ConflictingMessageConsumerException` is handled before processing messages
if (throwable instanceof ConflictingMessageConsumerException) {
return;
}
Metrics.summary(INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME, platformTag).record(sentMessageCounter.sum());
Metrics.timer(INITIAL_QUEUE_DRAIN_TIMER_NAME, platformTag).record(drainDuration);
measureSendMessageErrors(throwable);
if (!client.isOpen()) {
logger.debug("Client disconnected before queue cleared");
return;
}
client.close(1011, "Failed to retrieve messages");
if (drainDuration.compareTo(SLOW_DRAIN_THRESHOLD) > 0) {
Metrics.counter(SLOW_QUEUE_DRAIN_COUNTER_NAME, platformTag).increment();
}
));
client.sendRequest("PUT", "/api/v1/queue/empty",
Collections.singletonList(HeaderUtils.getTimestampHeader()), Optional.empty());
}
})
.subscribeOn(messageDeliveryScheduler)
.subscribe());
}
public void stop() {