From a043b4e5730adb3b48b3bb4984da9fe26791c7b0 Mon Sep 17 00:00:00 2001 From: Cody Henthorne Date: Wed, 25 Jun 2025 14:12:24 -0400 Subject: [PATCH] Try to run IncomingMessageObserver more aggressively. --- .../securesms/messages/IncomingMessageObserver.kt | 10 ++++++++-- .../securesms/net/SignalWebSocketHealthMonitor.kt | 2 +- .../signalservice/api/websocket/SignalWebSocket.kt | 13 ++++++++++--- .../websocket/OkHttpWebSocketConnection.java | 4 ++-- 4 files changed, 21 insertions(+), 8 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt index ea5f3bedfa..3efd305c1a 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt @@ -175,6 +175,12 @@ class IncomingMessageObserver( } } } + + authWebSocket.addKeepAliveChangeListener { + lock.withLock { + connectionNecessarySemaphore.release() + } + } } fun notifyRegistrationStateChanged() { @@ -236,12 +242,12 @@ class IncomingMessageObserver( val needsConnectionString = if (conclusion) "Needs Connection" else "Does Not Need Connection" - Log.d(TAG, "[$needsConnectionString] Network: $hasNetwork, Foreground: $appVisibleSnapshot, Time Since Last Interaction: $lastInteractionString, FCM: $fcmEnabled, WS Connected: $websocketAlreadyOpen, Registered: $registered, Proxy: $hasProxy, Force websocket: $forceWebsocket") + Log.d(TAG, "[$needsConnectionString] Network: $hasNetwork, Foreground: $appVisibleSnapshot, Time Since Last Interaction: $lastInteractionString, FCM: $fcmEnabled, WS Open or Keep-alives: $websocketAlreadyOpen, Registered: $registered, Proxy: $hasProxy, Force websocket: $forceWebsocket") return conclusion } private fun isConnectionAvailable(): Boolean { - return authWebSocket.stateSnapshot == WebSocketConnectionState.CONNECTED + return authWebSocket.stateSnapshot == WebSocketConnectionState.CONNECTED || authWebSocket.shouldSendKeepAlives() } private fun waitForConnectionNecessary() { diff --git a/app/src/main/java/org/thoughtcrime/securesms/net/SignalWebSocketHealthMonitor.kt b/app/src/main/java/org/thoughtcrime/securesms/net/SignalWebSocketHealthMonitor.kt index 9a3aeaf091..cd5bf653f0 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/net/SignalWebSocketHealthMonitor.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/net/SignalWebSocketHealthMonitor.kt @@ -74,7 +74,7 @@ class SignalWebSocketHealthMonitor( .distinctUntilChanged() .subscribeBy { onStateChanged(it) } - webSocket.keepAliveChangedListener = { executor.execute(this::updateKeepAliveSenderStatus) } + webSocket.addKeepAliveChangeListener { executor.execute(this::updateKeepAliveSenderStatus) } } } diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/websocket/SignalWebSocket.kt b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/websocket/SignalWebSocket.kt index f493c73d77..2807c16c9f 100644 --- a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/websocket/SignalWebSocket.kt +++ b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/websocket/SignalWebSocket.kt @@ -23,10 +23,13 @@ import org.whispersystems.signalservice.internal.websocket.WebSocketRequestMessa import org.whispersystems.signalservice.internal.websocket.WebSocketResponseMessage import org.whispersystems.signalservice.internal.websocket.WebsocketResponse import java.io.IOException +import java.util.concurrent.CopyOnWriteArraySet import java.util.concurrent.TimeoutException import kotlin.time.Duration import kotlin.time.Duration.Companion.milliseconds +private typealias Listener = () -> Unit + /** * Base wrapper around a [WebSocketConnection] to provide a more developer friend interface to websocket * interactions. @@ -54,7 +57,7 @@ sealed class SignalWebSocket( protected var disposable: CompositeDisposable = CompositeDisposable() private val keepAliveTokens: MutableSet = mutableSetOf() - var keepAliveChangedListener: (() -> Unit)? = null + private val keepAliveChangeListeners: MutableSet = CopyOnWriteArraySet() private var delayedDisconnectThread: DelayedDisconnectThread? = null @@ -123,7 +126,7 @@ sealed class SignalWebSocket( } if (changed) { - keepAliveChangedListener?.invoke() + keepAliveChangeListeners.forEach { it() } } } @@ -132,10 +135,14 @@ sealed class SignalWebSocket( if (keepAliveTokens.remove(token)) { Log.v(TAG, "$connectionName Removing keepAliveToken: $token, remaining: $keepAliveTokens") startDelayedDisconnectIfNecessary() - keepAliveChangedListener?.invoke() + keepAliveChangeListeners.forEach { it() } } } + fun addKeepAliveChangeListener(listener: Listener) { + keepAliveChangeListeners.add(listener) + } + fun request(request: WebSocketRequestMessage): Single { return try { delayedDisconnectThread?.resetLastInteractionTime() diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/websocket/OkHttpWebSocketConnection.java b/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/websocket/OkHttpWebSocketConnection.java index e4df5261e7..e05eb14f47 100644 --- a/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/websocket/OkHttpWebSocketConnection.java +++ b/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/websocket/OkHttpWebSocketConnection.java @@ -327,7 +327,7 @@ public class OkHttpWebSocketConnection extends WebSocketListener implements WebS @Override public synchronized void onClosed(WebSocket webSocket, int code, String reason) { - log("onClose()"); + log("onClose(" + code + ")"); webSocketState.onNext(WebSocketConnectionState.DISCONNECTED); cleanupAfterShutdown(code); @@ -380,7 +380,7 @@ public class OkHttpWebSocketConnection extends WebSocketListener implements WebS @Override public synchronized void onClosing(WebSocket webSocket, int code, String reason) { - log("onClosing()"); + log("onClosing(" + code + ")"); webSocketState.onNext(WebSocketConnectionState.DISCONNECTING); webSocket.close(1000, "OK"); }