From bc94a92f6814380e850c774dbb78136fd48fc0a2 Mon Sep 17 00:00:00 2001 From: andrew-signal Date: Mon, 5 May 2025 09:07:43 -0500 Subject: [PATCH] Remove pendingResponses; libsignal-net now completes futures with disconnectReason. --- .../websocket/LibSignalChatConnection.kt | 35 +++++-------------- 1 file changed, 8 insertions(+), 27 deletions(-) diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/websocket/LibSignalChatConnection.kt b/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/websocket/LibSignalChatConnection.kt index 7c52b9fd58..9a96a9339f 100644 --- a/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/websocket/LibSignalChatConnection.kt +++ b/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/websocket/LibSignalChatConnection.kt @@ -31,7 +31,6 @@ import org.whispersystems.signalservice.internal.util.whenComplete import java.io.IOException import java.net.SocketException import java.time.Instant -import java.util.Collections import java.util.Optional import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.Executors @@ -65,7 +64,6 @@ class LibSignalChatConnection( private val healthMonitor: HealthMonitor ) : WebSocketConnection { private val incomingRequestQueue = LinkedBlockingQueue() - private val pendingResponses = Collections.synchronizedSet(HashSet>()) // One of the more nasty parts of this is that libsignal-net does not expose, nor does it ever // intend to expose, the ID of the incoming "request" to the app layer. Instead, the app layer @@ -151,7 +149,6 @@ class LibSignalChatConnection( // there is no ackSender for a pseudoId gracefully in sendResponse. ackSenderForInternalPseudoId.clear() // There's no sense in resetting nextIncomingMessageInternalPseudoId. - pendingResponses.clear() } init { @@ -303,11 +300,9 @@ class LibSignalChatConnection( try { sendRequest(request).subscribe( { response -> - pendingResponses.remove(single) single.onSuccess(response) }, { error -> - pendingResponses.remove(single) single.onError(error) } ) @@ -333,11 +328,9 @@ class LibSignalChatConnection( is DeviceDeregisteredException -> NonSuccessfulResponseCodeException(403) else -> SocketException("Closed unexpectedly") } - pendingResponses.remove(single) single.onError(downstreamThrowable) } ) - pendingResponses.add(single) return single.subscribeOn(Schedulers.io()).observeOn(Schedulers.io()) } @@ -356,19 +349,20 @@ class LibSignalChatConnection( } // Here success means "we received the response" even if it is reporting an error. // This is consistent with the behavior of the OkHttpWebSocketConnection. - pendingResponses.remove(single) single.onSuccess(response.toWebsocketResponse(isUnidentified = (chatConnection is UnauthenticatedChatConnection))) }, onFailure = { throwable -> Log.w(TAG, "$name [sendRequest] Failure:", throwable) - // The clients of WebSocketConnection are often sensitive to the exact type of exception returned. - // This is the exception that OkHttpWebSocketConnection throws in the closest scenario to this, when - // the connection fails before the request completes. - pendingResponses.remove(single) - single.onError(SocketException("Failed to get response for request")) + val downstreamThrowable = when (throwable) { + is ConnectionInvalidatedException -> NonSuccessfulResponseCodeException(4401) + // The clients of WebSocketConnection are often sensitive to the exact type of exception returned. + // This is the exception that OkHttpWebSocketConnection throws in the closest scenario to this, when + // the connection fails before the request completes. + else -> SocketException("Failed to get response for request") + } + single.onError(downstreamThrowable) } ) - pendingResponses.add(single) return single.subscribeOn(Schedulers.io()).observeOn(Schedulers.io()) } } @@ -562,19 +556,6 @@ class LibSignalChatConnection( Log.i(TAG, "$name disconnected") } else { Log.i(TAG, "$name connection unexpectedly closed", disconnectReason) - - val downstreamThrowable = when (disconnectReason) { - // This matches the behavior of OkHttpWebSocketConnection when the connection terminates - // by the server before the response is received. - is ConnectionInvalidatedException -> NonSuccessfulResponseCodeException(4401) - else -> disconnectReason - } - - synchronized(pendingResponses) { - for (pendingResponse in pendingResponses) { - pendingResponse.onError(downstreamThrowable) - } - } } chatConnection = null state.onNext(WebSocketConnectionState.DISCONNECTED)