From a2f7afcb6823eb61e4040c323b057429add26962 Mon Sep 17 00:00:00 2001 From: Cody Henthorne Date: Fri, 16 May 2025 13:13:32 -0400 Subject: [PATCH] Detect and recycle WebSockets stuck in connecting state. --- .../securesms/dependencies/AppDependencies.kt | 2 +- .../ApplicationDependencyProvider.java | 4 +-- .../dependencies/NetworkDependenciesModule.kt | 2 +- .../messages/IncomingMessageObserver.kt | 15 +++++--- .../messages/NetworkConnectionListener.kt | 8 ++--- .../net/SignalWebSocketHealthMonitor.kt | 36 +++++++++++++++++++ .../MockApplicationDependencyProvider.kt | 2 +- .../api/websocket/SignalWebSocket.kt | 2 +- 8 files changed, 57 insertions(+), 14 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/AppDependencies.kt b/app/src/main/java/org/thoughtcrime/securesms/dependencies/AppDependencies.kt index b7e7a323cb..6fdc716ab4 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/AppDependencies.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/AppDependencies.kt @@ -403,7 +403,7 @@ object AppDependencies { fun provideMegaphoneRepository(): MegaphoneRepository fun provideEarlyMessageCache(): EarlyMessageCache fun provideMessageNotifier(): MessageNotifier - fun provideIncomingMessageObserver(webSocket: SignalWebSocket.AuthenticatedWebSocket): IncomingMessageObserver + fun provideIncomingMessageObserver(webSocket: SignalWebSocket.AuthenticatedWebSocket, unauthWebSocket: SignalWebSocket.UnauthenticatedWebSocket): IncomingMessageObserver fun provideTrimThreadsByDateManager(): TrimThreadsByDateManager fun provideViewOnceMessageManager(): ViewOnceMessageManager fun provideExpiringStoriesManager(): ExpiringStoriesManager diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java index f6fcce5ed0..b210760c0b 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java @@ -224,8 +224,8 @@ public class ApplicationDependencyProvider implements AppDependencies.Provider { } @Override - public @NonNull IncomingMessageObserver provideIncomingMessageObserver(@NonNull SignalWebSocket.AuthenticatedWebSocket webSocket) { - return new IncomingMessageObserver(context, webSocket); + public @NonNull IncomingMessageObserver provideIncomingMessageObserver(@NonNull SignalWebSocket.AuthenticatedWebSocket webSocket, @NonNull SignalWebSocket.UnauthenticatedWebSocket unauthWebSocket) { + return new IncomingMessageObserver(context, webSocket, unauthWebSocket); } @Override diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/NetworkDependenciesModule.kt b/app/src/main/java/org/thoughtcrime/securesms/dependencies/NetworkDependenciesModule.kt index b61d279753..8753e831c4 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/NetworkDependenciesModule.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/NetworkDependenciesModule.kt @@ -92,7 +92,7 @@ class NetworkDependenciesModule( val signalServiceMessageSender: SignalServiceMessageSender by _signalServiceMessageSender val incomingMessageObserver: IncomingMessageObserver by lazy { - provider.provideIncomingMessageObserver(authWebSocket) + provider.provideIncomingMessageObserver(authWebSocket, unauthWebSocket) } val pushServiceSocket: PushServiceSocket by lazy { diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt index 9d93bb7eb7..10963e1660 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt @@ -60,7 +60,11 @@ import kotlin.time.Duration.Companion.seconds * This class is responsible for keeping the authenticated websocket open based on the app's state for incoming messages and * observing new inbound messages received over the websocket. */ -class IncomingMessageObserver(private val context: Application, private val authWebSocket: SignalWebSocket.AuthenticatedWebSocket) { +class IncomingMessageObserver( + private val context: Application, + private val authWebSocket: SignalWebSocket.AuthenticatedWebSocket, + private val unauthWebSocket: SignalWebSocket.UnauthenticatedWebSocket +) { companion object { private val TAG = Log.tag(IncomingMessageObserver::class.java) @@ -89,18 +93,21 @@ class IncomingMessageObserver(private val context: Application, private val auth private val connectionNecessarySemaphore = Semaphore(0) private var previousProxyInfo: ProxyInfo? = null private val networkConnectionListener = NetworkConnectionListener( - context, - { isNetworkUnavailable -> + context = context, + onNetworkLost = { isNetworkUnavailable -> lock.withLock { AppDependencies.libsignalNetwork.onNetworkChange() if (isNetworkUnavailable()) { Log.w(TAG, "Lost network connection. Resetting the drained state.") decryptionDrained = false + authWebSocket.disconnect() + // TODO [no-more-rest] Move the connection listener to a neutral location so this isn't passed in + unauthWebSocket.disconnect() } connectionNecessarySemaphore.release() } }, - { proxyInfo -> + onProxySettingsChanged = { proxyInfo -> if (proxyInfo != previousProxyInfo) { val networkReset = AppDependencies.onSystemHttpProxyChange(proxyInfo?.host, proxyInfo?.port) if (networkReset) { diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/NetworkConnectionListener.kt b/app/src/main/java/org/thoughtcrime/securesms/messages/NetworkConnectionListener.kt index e5854e6221..34d58ae707 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/messages/NetworkConnectionListener.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/NetworkConnectionListener.kt @@ -42,25 +42,25 @@ class NetworkConnectionListener(private val context: Context, private val onNetw override fun onBlockedStatusChanged(network: Network, blocked: Boolean) { super.onBlockedStatusChanged(network, blocked) - Log.d(TAG, "ConnectivityManager.NetworkCallback onBlockedStatusChanged()") + Log.d(TAG, "ConnectivityManager.NetworkCallback onBlockedStatusChanged($network, $blocked)") onNetworkLost { blocked } } override fun onAvailable(network: Network) { super.onAvailable(network) - Log.d(TAG, "ConnectivityManager.NetworkCallback onAvailable()") + Log.d(TAG, "ConnectivityManager.NetworkCallback onAvailable($network)") onNetworkLost { false } } override fun onLost(network: Network) { super.onLost(network) - Log.d(TAG, "ConnectivityManager.NetworkCallback onLost()") + Log.d(TAG, "ConnectivityManager.NetworkCallback onLost($network)") onNetworkLost { true } } override fun onLinkPropertiesChanged(network: Network, linkProperties: LinkProperties) { super.onLinkPropertiesChanged(network, linkProperties) - Log.d(TAG, "ConnectivityManager.NetworkCallback onLinkPropertiesChanged()") + Log.d(TAG, "ConnectivityManager.NetworkCallback onLinkPropertiesChanged($network)") onProxySettingsChanged(linkProperties.httpProxy) } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/net/SignalWebSocketHealthMonitor.kt b/app/src/main/java/org/thoughtcrime/securesms/net/SignalWebSocketHealthMonitor.kt index 328f2037be..9a3aeaf091 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/net/SignalWebSocketHealthMonitor.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/net/SignalWebSocketHealthMonitor.kt @@ -7,9 +7,16 @@ package org.thoughtcrime.securesms.net import io.reactivex.rxjava3.kotlin.subscribeBy import io.reactivex.rxjava3.schedulers.Schedulers +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import org.signal.core.util.logging.Log import org.thoughtcrime.securesms.dependencies.AppDependencies import org.thoughtcrime.securesms.keyvalue.SignalStore +import org.thoughtcrime.securesms.net.SignalWebSocketHealthMonitor.Companion.KEEP_ALIVE_SEND_CADENCE +import org.thoughtcrime.securesms.net.SignalWebSocketHealthMonitor.Companion.KEEP_ALIVE_TIMEOUT import org.thoughtcrime.securesms.util.TextSecurePreferences import org.whispersystems.signalservice.api.util.SleepTimer import org.whispersystems.signalservice.api.websocket.HealthMonitor @@ -49,6 +56,10 @@ class SignalWebSocketHealthMonitor( private var needsKeepAlive = false private var lastKeepAliveReceived: Duration = 0.seconds + private val scope = CoroutineScope(Dispatchers.IO) + private var connectingTimeoutJob: Job? = null + private var failedInConnecting: Boolean = false + @Suppress("CheckResult") fun monitor(webSocket: SignalWebSocket) { executor.execute { @@ -69,11 +80,22 @@ class SignalWebSocketHealthMonitor( private fun onStateChanged(connectionState: WebSocketConnectionState) { executor.execute { + Log.v(TAG, "${webSocket?.connectionName} onStateChange($connectionState)") + when (connectionState) { + WebSocketConnectionState.CONNECTING -> { + connectingTimeoutJob?.cancel() + connectingTimeoutJob = scope.launch { + delay(if (failedInConnecting) 60.seconds else 30.seconds) + Log.w(TAG, "${webSocket?.connectionName} Did not leave CONNECTING state, starting over") + onConnectingTimeout() + } + } WebSocketConnectionState.CONNECTED -> { if (webSocket is SignalWebSocket.AuthenticatedWebSocket) { TextSecurePreferences.setUnauthorizedReceived(AppDependencies.application, false) } + failedInConnecting = false } WebSocketConnectionState.AUTHENTICATION_FAILED -> { if (webSocket is SignalWebSocket.AuthenticatedWebSocket) { @@ -91,6 +113,13 @@ class SignalWebSocketHealthMonitor( needsKeepAlive = connectionState == WebSocketConnectionState.CONNECTED + if (connectionState != WebSocketConnectionState.CONNECTING) { + connectingTimeoutJob?.let { + it.cancel() + connectingTimeoutJob = null + } + } + updateKeepAliveSenderStatus() } } @@ -112,6 +141,13 @@ class SignalWebSocketHealthMonitor( } } + private fun onConnectingTimeout() { + executor.execute { + webSocket?.forceNewWebSocket() + failedInConnecting = true + } + } + private fun updateKeepAliveSenderStatus() { if (keepAliveSender == null && sendKeepAlives()) { keepAliveSender = KeepAliveSender().also { it.start() } diff --git a/app/src/test/java/org/thoughtcrime/securesms/dependencies/MockApplicationDependencyProvider.kt b/app/src/test/java/org/thoughtcrime/securesms/dependencies/MockApplicationDependencyProvider.kt index 98650e1673..cec0b8db73 100644 --- a/app/src/test/java/org/thoughtcrime/securesms/dependencies/MockApplicationDependencyProvider.kt +++ b/app/src/test/java/org/thoughtcrime/securesms/dependencies/MockApplicationDependencyProvider.kt @@ -117,7 +117,7 @@ class MockApplicationDependencyProvider : AppDependencies.Provider { return mockk(relaxed = true) } - override fun provideIncomingMessageObserver(webSocket: SignalWebSocket.AuthenticatedWebSocket): IncomingMessageObserver { + override fun provideIncomingMessageObserver(webSocket: SignalWebSocket.AuthenticatedWebSocket, unauthWebSocket: SignalWebSocket.UnauthenticatedWebSocket): IncomingMessageObserver { return mockk(relaxed = true) } diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/websocket/SignalWebSocket.kt b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/websocket/SignalWebSocket.kt index 1e64b5dfa8..f493c73d77 100644 --- a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/websocket/SignalWebSocket.kt +++ b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/websocket/SignalWebSocket.kt @@ -47,7 +47,7 @@ sealed class SignalWebSocket( } private var connection: WebSocketConnection? = null - private val connectionName + val connectionName get() = connection?.name ?: "[null]" private val _state: BehaviorSubject = BehaviorSubject.createDefault(WebSocketConnectionState.DISCONNECTED)