Clean up MessageAvailabilityListener if the websocket client is closed

This commit is contained in:
Chris Eager
2022-08-01 14:37:05 -05:00
committed by Chris Eager
parent a06a663b94
commit 55df593561
6 changed files with 154 additions and 29 deletions

View File

@@ -12,7 +12,15 @@ package org.whispersystems.textsecuregcm.storage;
*/
public interface MessageAvailabilityListener {
void handleNewMessagesAvailable();
/**
* @return whether the listener is still active. {@code false} indicates the listener can no longer handle messages
* and may be discarded
*/
boolean handleNewMessagesAvailable();
void handleMessagesPersisted();
/**
* @return whether the listener is still active. {@code false} indicates the listener can no longer handle messages
* and may be discarded
*/
boolean handleMessagesPersisted();
}

View File

@@ -351,7 +351,11 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
newMessageNotificationCounter.increment();
notificationExecutorService.execute(() -> {
try {
findListener(channel).ifPresent(MessageAvailabilityListener::handleNewMessagesAvailable);
findListener(channel).ifPresent(listener -> {
if (!listener.handleNewMessagesAvailable()) {
removeMessageAvailabilityListener(listener);
}
});
} catch (final Exception e) {
logger.warn("Unexpected error handling new message", e);
}
@@ -360,7 +364,11 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
queuePersistedNotificationCounter.increment();
notificationExecutorService.execute(() -> {
try {
findListener(channel).ifPresent(MessageAvailabilityListener::handleMessagesPersisted);
findListener(channel).ifPresent(listener -> {
if (!listener.handleMessagesPersisted()) {
removeMessageAvailabilityListener(listener);
}
});
} catch (final Exception e) {
logger.warn("Unexpected error handling messages persisted", e);
}

View File

@@ -57,24 +57,33 @@ import org.whispersystems.websocket.messages.WebSocketResponseMessage;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public class WebSocketConnection implements MessageAvailabilityListener, DisplacedPresenceListener {
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private static final Histogram messageTime = metricRegistry.histogram(name(MessageController.class, "message_delivery_duration"));
private static final Histogram primaryDeviceMessageTime = metricRegistry.histogram(name(MessageController.class, "primary_device_message_delivery_duration"));
private static final Meter sendMessageMeter = metricRegistry.meter(name(WebSocketConnection.class, "send_message"));
private static final Meter messageAvailableMeter = metricRegistry.meter(name(WebSocketConnection.class, "messagesAvailable"));
private static final Meter messagesPersistedMeter = metricRegistry.meter(name(WebSocketConnection.class, "messagesPersisted"));
private static final Meter bytesSentMeter = metricRegistry.meter(name(WebSocketConnection.class, "bytes_sent"));
private static final Meter sendFailuresMeter = metricRegistry.meter(name(WebSocketConnection.class, "send_failures"));
private static final Meter discardedMessagesMeter = metricRegistry.meter(name(WebSocketConnection.class, "discardedMessages"));
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private static final Histogram messageTime = metricRegistry.histogram(
name(MessageController.class, "message_delivery_duration"));
private static final Histogram primaryDeviceMessageTime = metricRegistry.histogram(
name(MessageController.class, "primary_device_message_delivery_duration"));
private static final Meter sendMessageMeter = metricRegistry.meter(name(WebSocketConnection.class, "send_message"));
private static final Meter messageAvailableMeter = metricRegistry.meter(
name(WebSocketConnection.class, "messagesAvailable"));
private static final Meter messagesPersistedMeter = metricRegistry.meter(
name(WebSocketConnection.class, "messagesPersisted"));
private static final Meter bytesSentMeter = metricRegistry.meter(name(WebSocketConnection.class, "bytes_sent"));
private static final Meter sendFailuresMeter = metricRegistry.meter(name(WebSocketConnection.class, "send_failures"));
private static final Meter discardedMessagesMeter = metricRegistry.meter(
name(WebSocketConnection.class, "discardedMessages"));
private static final String INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME = name(WebSocketConnection.class, "initialQueueLength");
private static final String INITIAL_QUEUE_DRAIN_TIMER_NAME = name(WebSocketConnection.class, "drainInitialQueue");
private static final String SLOW_QUEUE_DRAIN_COUNTER_NAME = name(WebSocketConnection.class, "slowQueueDrain");
private static final String QUEUE_DRAIN_RETRY_COUNTER_NAME = name(WebSocketConnection.class, "queueDrainRetry");
private static final String DISPLACEMENT_COUNTER_NAME = name(WebSocketConnection.class, "displacement");
private static final String NON_SUCCESS_RESPONSE_COUNTER_NAME = name(WebSocketConnection.class, "clientNonSuccessResponse");
private static final String STATUS_CODE_TAG = "status";
private static final String STATUS_MESSAGE_TAG = "message";
private static final String INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME = name(WebSocketConnection.class,
"initialQueueLength");
private static final String INITIAL_QUEUE_DRAIN_TIMER_NAME = name(WebSocketConnection.class, "drainInitialQueue");
private static final String SLOW_QUEUE_DRAIN_COUNTER_NAME = name(WebSocketConnection.class, "slowQueueDrain");
private static final String QUEUE_DRAIN_RETRY_COUNTER_NAME = name(WebSocketConnection.class, "queueDrainRetry");
private static final String DISPLACEMENT_COUNTER_NAME = name(WebSocketConnection.class, "displacement");
private static final String NON_SUCCESS_RESPONSE_COUNTER_NAME = name(WebSocketConnection.class,
"clientNonSuccessResponse");
private static final String CLIENT_CLOSED_MESSAGE_AVAILABLE_COUNTER_NAME = name(WebSocketConnection.class,
"messageAvailableAfterClientClosed");
private static final String STATUS_CODE_TAG = "status";
private static final String STATUS_MESSAGE_TAG = "message";
private static final long SLOW_DRAIN_THRESHOLD = 10_000;
@@ -350,19 +359,34 @@ public class WebSocketConnection implements MessageAvailabilityListener, Displac
}
@Override
public void handleNewMessagesAvailable() {
public boolean handleNewMessagesAvailable() {
if (!client.isOpen()) {
// The client may become closed without successful removal of references to the `MessageAvailabilityListener`
Metrics.counter(CLIENT_CLOSED_MESSAGE_AVAILABLE_COUNTER_NAME).increment();
return false;
}
messageAvailableMeter.mark();
storedMessageState.compareAndSet(StoredMessageState.EMPTY, StoredMessageState.CACHED_NEW_MESSAGES_AVAILABLE);
processStoredMessages();
return true;
}
@Override
public void handleMessagesPersisted() {
public boolean handleMessagesPersisted() {
if (!client.isOpen()) {
// The client may become without successful removal of references to the `MessageAvailabilityListener`
Metrics.counter(CLIENT_CLOSED_MESSAGE_AVAILABLE_COUNTER_NAME).increment();
return false;
}
messagesPersistedMeter.mark();
storedMessageState.set(StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE);
processStoredMessages();
return true;
}
@Override