Try to run IncomingMessageObserver more aggressively.

This commit is contained in:
Cody Henthorne
2025-06-25 14:12:24 -04:00
parent c0340be3ce
commit a043b4e573
4 changed files with 21 additions and 8 deletions

View File

@@ -175,6 +175,12 @@ class IncomingMessageObserver(
} }
} }
} }
authWebSocket.addKeepAliveChangeListener {
lock.withLock {
connectionNecessarySemaphore.release()
}
}
} }
fun notifyRegistrationStateChanged() { fun notifyRegistrationStateChanged() {
@@ -236,12 +242,12 @@ class IncomingMessageObserver(
val needsConnectionString = if (conclusion) "Needs Connection" else "Does Not Need Connection" val needsConnectionString = if (conclusion) "Needs Connection" else "Does Not Need Connection"
Log.d(TAG, "[$needsConnectionString] Network: $hasNetwork, Foreground: $appVisibleSnapshot, Time Since Last Interaction: $lastInteractionString, FCM: $fcmEnabled, WS Connected: $websocketAlreadyOpen, Registered: $registered, Proxy: $hasProxy, Force websocket: $forceWebsocket") Log.d(TAG, "[$needsConnectionString] Network: $hasNetwork, Foreground: $appVisibleSnapshot, Time Since Last Interaction: $lastInteractionString, FCM: $fcmEnabled, WS Open or Keep-alives: $websocketAlreadyOpen, Registered: $registered, Proxy: $hasProxy, Force websocket: $forceWebsocket")
return conclusion return conclusion
} }
private fun isConnectionAvailable(): Boolean { private fun isConnectionAvailable(): Boolean {
return authWebSocket.stateSnapshot == WebSocketConnectionState.CONNECTED return authWebSocket.stateSnapshot == WebSocketConnectionState.CONNECTED || authWebSocket.shouldSendKeepAlives()
} }
private fun waitForConnectionNecessary() { private fun waitForConnectionNecessary() {

View File

@@ -74,7 +74,7 @@ class SignalWebSocketHealthMonitor(
.distinctUntilChanged() .distinctUntilChanged()
.subscribeBy { onStateChanged(it) } .subscribeBy { onStateChanged(it) }
webSocket.keepAliveChangedListener = { executor.execute(this::updateKeepAliveSenderStatus) } webSocket.addKeepAliveChangeListener { executor.execute(this::updateKeepAliveSenderStatus) }
} }
} }

View File

@@ -23,10 +23,13 @@ import org.whispersystems.signalservice.internal.websocket.WebSocketRequestMessa
import org.whispersystems.signalservice.internal.websocket.WebSocketResponseMessage import org.whispersystems.signalservice.internal.websocket.WebSocketResponseMessage
import org.whispersystems.signalservice.internal.websocket.WebsocketResponse import org.whispersystems.signalservice.internal.websocket.WebsocketResponse
import java.io.IOException import java.io.IOException
import java.util.concurrent.CopyOnWriteArraySet
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
import kotlin.time.Duration import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds import kotlin.time.Duration.Companion.milliseconds
private typealias Listener = () -> Unit
/** /**
* Base wrapper around a [WebSocketConnection] to provide a more developer friend interface to websocket * Base wrapper around a [WebSocketConnection] to provide a more developer friend interface to websocket
* interactions. * interactions.
@@ -54,7 +57,7 @@ sealed class SignalWebSocket(
protected var disposable: CompositeDisposable = CompositeDisposable() protected var disposable: CompositeDisposable = CompositeDisposable()
private val keepAliveTokens: MutableSet<String> = mutableSetOf() private val keepAliveTokens: MutableSet<String> = mutableSetOf()
var keepAliveChangedListener: (() -> Unit)? = null private val keepAliveChangeListeners: MutableSet<Listener> = CopyOnWriteArraySet()
private var delayedDisconnectThread: DelayedDisconnectThread? = null private var delayedDisconnectThread: DelayedDisconnectThread? = null
@@ -123,7 +126,7 @@ sealed class SignalWebSocket(
} }
if (changed) { if (changed) {
keepAliveChangedListener?.invoke() keepAliveChangeListeners.forEach { it() }
} }
} }
@@ -132,10 +135,14 @@ sealed class SignalWebSocket(
if (keepAliveTokens.remove(token)) { if (keepAliveTokens.remove(token)) {
Log.v(TAG, "$connectionName Removing keepAliveToken: $token, remaining: $keepAliveTokens") Log.v(TAG, "$connectionName Removing keepAliveToken: $token, remaining: $keepAliveTokens")
startDelayedDisconnectIfNecessary() startDelayedDisconnectIfNecessary()
keepAliveChangedListener?.invoke() keepAliveChangeListeners.forEach { it() }
} }
} }
fun addKeepAliveChangeListener(listener: Listener) {
keepAliveChangeListeners.add(listener)
}
fun request(request: WebSocketRequestMessage): Single<WebsocketResponse> { fun request(request: WebSocketRequestMessage): Single<WebsocketResponse> {
return try { return try {
delayedDisconnectThread?.resetLastInteractionTime() delayedDisconnectThread?.resetLastInteractionTime()

View File

@@ -327,7 +327,7 @@ public class OkHttpWebSocketConnection extends WebSocketListener implements WebS
@Override @Override
public synchronized void onClosed(WebSocket webSocket, int code, String reason) { public synchronized void onClosed(WebSocket webSocket, int code, String reason) {
log("onClose()"); log("onClose(" + code + ")");
webSocketState.onNext(WebSocketConnectionState.DISCONNECTED); webSocketState.onNext(WebSocketConnectionState.DISCONNECTED);
cleanupAfterShutdown(code); cleanupAfterShutdown(code);
@@ -380,7 +380,7 @@ public class OkHttpWebSocketConnection extends WebSocketListener implements WebS
@Override @Override
public synchronized void onClosing(WebSocket webSocket, int code, String reason) { public synchronized void onClosing(WebSocket webSocket, int code, String reason) {
log("onClosing()"); log("onClosing(" + code + ")");
webSocketState.onNext(WebSocketConnectionState.DISCONNECTING); webSocketState.onNext(WebSocketConnectionState.DISCONNECTING);
webSocket.close(1000, "OK"); webSocket.close(1000, "OK");
} }