Remove pendingResponses; libsignal-net now completes futures with disconnectReason.

This commit is contained in:
andrew-signal
2025-05-05 09:07:43 -05:00
committed by Michelle Tang
parent 9b9888565b
commit bc94a92f68

View File

@@ -31,7 +31,6 @@ import org.whispersystems.signalservice.internal.util.whenComplete
import java.io.IOException import java.io.IOException
import java.net.SocketException import java.net.SocketException
import java.time.Instant import java.time.Instant
import java.util.Collections
import java.util.Optional import java.util.Optional
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors import java.util.concurrent.Executors
@@ -65,7 +64,6 @@ class LibSignalChatConnection(
private val healthMonitor: HealthMonitor private val healthMonitor: HealthMonitor
) : WebSocketConnection { ) : WebSocketConnection {
private val incomingRequestQueue = LinkedBlockingQueue<WebSocketRequestMessage>() private val incomingRequestQueue = LinkedBlockingQueue<WebSocketRequestMessage>()
private val pendingResponses = Collections.synchronizedSet(HashSet<SingleSubject<WebsocketResponse>>())
// One of the more nasty parts of this is that libsignal-net does not expose, nor does it ever // One of the more nasty parts of this is that libsignal-net does not expose, nor does it ever
// intend to expose, the ID of the incoming "request" to the app layer. Instead, the app layer // intend to expose, the ID of the incoming "request" to the app layer. Instead, the app layer
@@ -151,7 +149,6 @@ class LibSignalChatConnection(
// there is no ackSender for a pseudoId gracefully in sendResponse. // there is no ackSender for a pseudoId gracefully in sendResponse.
ackSenderForInternalPseudoId.clear() ackSenderForInternalPseudoId.clear()
// There's no sense in resetting nextIncomingMessageInternalPseudoId. // There's no sense in resetting nextIncomingMessageInternalPseudoId.
pendingResponses.clear()
} }
init { init {
@@ -303,11 +300,9 @@ class LibSignalChatConnection(
try { try {
sendRequest(request).subscribe( sendRequest(request).subscribe(
{ response -> { response ->
pendingResponses.remove(single)
single.onSuccess(response) single.onSuccess(response)
}, },
{ error -> { error ->
pendingResponses.remove(single)
single.onError(error) single.onError(error)
} }
) )
@@ -333,11 +328,9 @@ class LibSignalChatConnection(
is DeviceDeregisteredException -> NonSuccessfulResponseCodeException(403) is DeviceDeregisteredException -> NonSuccessfulResponseCodeException(403)
else -> SocketException("Closed unexpectedly") else -> SocketException("Closed unexpectedly")
} }
pendingResponses.remove(single)
single.onError(downstreamThrowable) single.onError(downstreamThrowable)
} }
) )
pendingResponses.add(single)
return single.subscribeOn(Schedulers.io()).observeOn(Schedulers.io()) return single.subscribeOn(Schedulers.io()).observeOn(Schedulers.io())
} }
@@ -356,19 +349,20 @@ class LibSignalChatConnection(
} }
// Here success means "we received the response" even if it is reporting an error. // Here success means "we received the response" even if it is reporting an error.
// This is consistent with the behavior of the OkHttpWebSocketConnection. // This is consistent with the behavior of the OkHttpWebSocketConnection.
pendingResponses.remove(single)
single.onSuccess(response.toWebsocketResponse(isUnidentified = (chatConnection is UnauthenticatedChatConnection))) single.onSuccess(response.toWebsocketResponse(isUnidentified = (chatConnection is UnauthenticatedChatConnection)))
}, },
onFailure = { throwable -> onFailure = { throwable ->
Log.w(TAG, "$name [sendRequest] Failure:", throwable) Log.w(TAG, "$name [sendRequest] Failure:", throwable)
val downstreamThrowable = when (throwable) {
is ConnectionInvalidatedException -> NonSuccessfulResponseCodeException(4401)
// The clients of WebSocketConnection are often sensitive to the exact type of exception returned. // The clients of WebSocketConnection are often sensitive to the exact type of exception returned.
// This is the exception that OkHttpWebSocketConnection throws in the closest scenario to this, when // This is the exception that OkHttpWebSocketConnection throws in the closest scenario to this, when
// the connection fails before the request completes. // the connection fails before the request completes.
pendingResponses.remove(single) else -> SocketException("Failed to get response for request")
single.onError(SocketException("Failed to get response for request")) }
single.onError(downstreamThrowable)
} }
) )
pendingResponses.add(single)
return single.subscribeOn(Schedulers.io()).observeOn(Schedulers.io()) return single.subscribeOn(Schedulers.io()).observeOn(Schedulers.io())
} }
} }
@@ -562,19 +556,6 @@ class LibSignalChatConnection(
Log.i(TAG, "$name disconnected") Log.i(TAG, "$name disconnected")
} else { } else {
Log.i(TAG, "$name connection unexpectedly closed", disconnectReason) Log.i(TAG, "$name connection unexpectedly closed", disconnectReason)
val downstreamThrowable = when (disconnectReason) {
// This matches the behavior of OkHttpWebSocketConnection when the connection terminates
// by the server before the response is received.
is ConnectionInvalidatedException -> NonSuccessfulResponseCodeException(4401)
else -> disconnectReason
}
synchronized(pendingResponses) {
for (pendingResponse in pendingResponses) {
pendingResponse.onError(downstreamThrowable)
}
}
} }
chatConnection = null chatConnection = null
state.onNext(WebSocketConnectionState.DISCONNECTED) state.onNext(WebSocketConnectionState.DISCONNECTED)