Detect and recycle WebSockets stuck in connecting state.

This commit is contained in:
Cody Henthorne
2025-05-16 13:13:32 -04:00
committed by GitHub
parent b6c033b075
commit a2f7afcb68
8 changed files with 57 additions and 14 deletions

View File

@@ -403,7 +403,7 @@ object AppDependencies {
fun provideMegaphoneRepository(): MegaphoneRepository
fun provideEarlyMessageCache(): EarlyMessageCache
fun provideMessageNotifier(): MessageNotifier
fun provideIncomingMessageObserver(webSocket: SignalWebSocket.AuthenticatedWebSocket): IncomingMessageObserver
fun provideIncomingMessageObserver(webSocket: SignalWebSocket.AuthenticatedWebSocket, unauthWebSocket: SignalWebSocket.UnauthenticatedWebSocket): IncomingMessageObserver
fun provideTrimThreadsByDateManager(): TrimThreadsByDateManager
fun provideViewOnceMessageManager(): ViewOnceMessageManager
fun provideExpiringStoriesManager(): ExpiringStoriesManager

View File

@@ -224,8 +224,8 @@ public class ApplicationDependencyProvider implements AppDependencies.Provider {
}
@Override
public @NonNull IncomingMessageObserver provideIncomingMessageObserver(@NonNull SignalWebSocket.AuthenticatedWebSocket webSocket) {
return new IncomingMessageObserver(context, webSocket);
public @NonNull IncomingMessageObserver provideIncomingMessageObserver(@NonNull SignalWebSocket.AuthenticatedWebSocket webSocket, @NonNull SignalWebSocket.UnauthenticatedWebSocket unauthWebSocket) {
return new IncomingMessageObserver(context, webSocket, unauthWebSocket);
}
@Override

View File

@@ -92,7 +92,7 @@ class NetworkDependenciesModule(
val signalServiceMessageSender: SignalServiceMessageSender by _signalServiceMessageSender
val incomingMessageObserver: IncomingMessageObserver by lazy {
provider.provideIncomingMessageObserver(authWebSocket)
provider.provideIncomingMessageObserver(authWebSocket, unauthWebSocket)
}
val pushServiceSocket: PushServiceSocket by lazy {

View File

@@ -60,7 +60,11 @@ import kotlin.time.Duration.Companion.seconds
* This class is responsible for keeping the authenticated websocket open based on the app's state for incoming messages and
* observing new inbound messages received over the websocket.
*/
class IncomingMessageObserver(private val context: Application, private val authWebSocket: SignalWebSocket.AuthenticatedWebSocket) {
class IncomingMessageObserver(
private val context: Application,
private val authWebSocket: SignalWebSocket.AuthenticatedWebSocket,
private val unauthWebSocket: SignalWebSocket.UnauthenticatedWebSocket
) {
companion object {
private val TAG = Log.tag(IncomingMessageObserver::class.java)
@@ -89,18 +93,21 @@ class IncomingMessageObserver(private val context: Application, private val auth
private val connectionNecessarySemaphore = Semaphore(0)
private var previousProxyInfo: ProxyInfo? = null
private val networkConnectionListener = NetworkConnectionListener(
context,
{ isNetworkUnavailable ->
context = context,
onNetworkLost = { isNetworkUnavailable ->
lock.withLock {
AppDependencies.libsignalNetwork.onNetworkChange()
if (isNetworkUnavailable()) {
Log.w(TAG, "Lost network connection. Resetting the drained state.")
decryptionDrained = false
authWebSocket.disconnect()
// TODO [no-more-rest] Move the connection listener to a neutral location so this isn't passed in
unauthWebSocket.disconnect()
}
connectionNecessarySemaphore.release()
}
},
{ proxyInfo ->
onProxySettingsChanged = { proxyInfo ->
if (proxyInfo != previousProxyInfo) {
val networkReset = AppDependencies.onSystemHttpProxyChange(proxyInfo?.host, proxyInfo?.port)
if (networkReset) {

View File

@@ -42,25 +42,25 @@ class NetworkConnectionListener(private val context: Context, private val onNetw
override fun onBlockedStatusChanged(network: Network, blocked: Boolean) {
super.onBlockedStatusChanged(network, blocked)
Log.d(TAG, "ConnectivityManager.NetworkCallback onBlockedStatusChanged()")
Log.d(TAG, "ConnectivityManager.NetworkCallback onBlockedStatusChanged($network, $blocked)")
onNetworkLost { blocked }
}
override fun onAvailable(network: Network) {
super.onAvailable(network)
Log.d(TAG, "ConnectivityManager.NetworkCallback onAvailable()")
Log.d(TAG, "ConnectivityManager.NetworkCallback onAvailable($network)")
onNetworkLost { false }
}
override fun onLost(network: Network) {
super.onLost(network)
Log.d(TAG, "ConnectivityManager.NetworkCallback onLost()")
Log.d(TAG, "ConnectivityManager.NetworkCallback onLost($network)")
onNetworkLost { true }
}
override fun onLinkPropertiesChanged(network: Network, linkProperties: LinkProperties) {
super.onLinkPropertiesChanged(network, linkProperties)
Log.d(TAG, "ConnectivityManager.NetworkCallback onLinkPropertiesChanged()")
Log.d(TAG, "ConnectivityManager.NetworkCallback onLinkPropertiesChanged($network)")
onProxySettingsChanged(linkProperties.httpProxy)
}
}

View File

@@ -7,9 +7,16 @@ package org.thoughtcrime.securesms.net
import io.reactivex.rxjava3.kotlin.subscribeBy
import io.reactivex.rxjava3.schedulers.Schedulers
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import org.signal.core.util.logging.Log
import org.thoughtcrime.securesms.dependencies.AppDependencies
import org.thoughtcrime.securesms.keyvalue.SignalStore
import org.thoughtcrime.securesms.net.SignalWebSocketHealthMonitor.Companion.KEEP_ALIVE_SEND_CADENCE
import org.thoughtcrime.securesms.net.SignalWebSocketHealthMonitor.Companion.KEEP_ALIVE_TIMEOUT
import org.thoughtcrime.securesms.util.TextSecurePreferences
import org.whispersystems.signalservice.api.util.SleepTimer
import org.whispersystems.signalservice.api.websocket.HealthMonitor
@@ -49,6 +56,10 @@ class SignalWebSocketHealthMonitor(
private var needsKeepAlive = false
private var lastKeepAliveReceived: Duration = 0.seconds
private val scope = CoroutineScope(Dispatchers.IO)
private var connectingTimeoutJob: Job? = null
private var failedInConnecting: Boolean = false
@Suppress("CheckResult")
fun monitor(webSocket: SignalWebSocket) {
executor.execute {
@@ -69,11 +80,22 @@ class SignalWebSocketHealthMonitor(
private fun onStateChanged(connectionState: WebSocketConnectionState) {
executor.execute {
Log.v(TAG, "${webSocket?.connectionName} onStateChange($connectionState)")
when (connectionState) {
WebSocketConnectionState.CONNECTING -> {
connectingTimeoutJob?.cancel()
connectingTimeoutJob = scope.launch {
delay(if (failedInConnecting) 60.seconds else 30.seconds)
Log.w(TAG, "${webSocket?.connectionName} Did not leave CONNECTING state, starting over")
onConnectingTimeout()
}
}
WebSocketConnectionState.CONNECTED -> {
if (webSocket is SignalWebSocket.AuthenticatedWebSocket) {
TextSecurePreferences.setUnauthorizedReceived(AppDependencies.application, false)
}
failedInConnecting = false
}
WebSocketConnectionState.AUTHENTICATION_FAILED -> {
if (webSocket is SignalWebSocket.AuthenticatedWebSocket) {
@@ -91,6 +113,13 @@ class SignalWebSocketHealthMonitor(
needsKeepAlive = connectionState == WebSocketConnectionState.CONNECTED
if (connectionState != WebSocketConnectionState.CONNECTING) {
connectingTimeoutJob?.let {
it.cancel()
connectingTimeoutJob = null
}
}
updateKeepAliveSenderStatus()
}
}
@@ -112,6 +141,13 @@ class SignalWebSocketHealthMonitor(
}
}
private fun onConnectingTimeout() {
executor.execute {
webSocket?.forceNewWebSocket()
failedInConnecting = true
}
}
private fun updateKeepAliveSenderStatus() {
if (keepAliveSender == null && sendKeepAlives()) {
keepAliveSender = KeepAliveSender().also { it.start() }

View File

@@ -117,7 +117,7 @@ class MockApplicationDependencyProvider : AppDependencies.Provider {
return mockk(relaxed = true)
}
override fun provideIncomingMessageObserver(webSocket: SignalWebSocket.AuthenticatedWebSocket): IncomingMessageObserver {
override fun provideIncomingMessageObserver(webSocket: SignalWebSocket.AuthenticatedWebSocket, unauthWebSocket: SignalWebSocket.UnauthenticatedWebSocket): IncomingMessageObserver {
return mockk(relaxed = true)
}

View File

@@ -47,7 +47,7 @@ sealed class SignalWebSocket(
}
private var connection: WebSocketConnection? = null
private val connectionName
val connectionName
get() = connection?.name ?: "[null]"
private val _state: BehaviorSubject<WebSocketConnectionState> = BehaviorSubject.createDefault(WebSocketConnectionState.DISCONNECTED)