From 43a1c93961552ed2442dfd1cbc05d915c3e75334 Mon Sep 17 00:00:00 2001 From: Cody Henthorne Date: Fri, 8 May 2026 13:15:39 -0400 Subject: [PATCH] Make network calling infrastructure more coroutine friendly. --- .../dependencies/KeyTransparencyApi.kt | 5 +- .../signalservice/api/NetworkResult.kt | 40 ++++++++++ .../api/attachment/AttachmentApi.kt | 17 +---- .../signalservice/api/message/MessageApi.kt | 5 +- .../signalservice/api/profiles/ProfileApi.kt | 30 ++++---- .../api/websocket/SignalWebSocket.kt | 74 +++++++++++++------ .../util/CompletableFutureExtensions.kt | 17 +++++ .../websocket/LibSignalChatConnection.kt | 37 +++++++++- .../internal/websocket/WebSocketConnection.kt | 5 ++ .../org/signal/network/api/UsernameApi.kt | 9 +-- 10 files changed, 174 insertions(+), 65 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/KeyTransparencyApi.kt b/app/src/main/java/org/thoughtcrime/securesms/dependencies/KeyTransparencyApi.kt index 23ff43a303..b18a30cfbc 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/KeyTransparencyApi.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/KeyTransparencyApi.kt @@ -3,7 +3,6 @@ package org.thoughtcrime.securesms.dependencies import org.signal.libsignal.keytrans.KeyTransparencyException import org.signal.libsignal.net.KeyTransparency.CheckMode import org.signal.libsignal.net.RequestResult -import org.signal.libsignal.net.getOrError import org.signal.libsignal.protocol.IdentityKey import org.signal.libsignal.protocol.ServiceId import org.thoughtcrime.securesms.database.model.KeyTransparencyStore @@ -15,8 +14,8 @@ import org.whispersystems.signalservice.api.websocket.SignalWebSocket class KeyTransparencyApi(private val unauthWebSocket: SignalWebSocket.UnauthenticatedWebSocket) { suspend fun check(checkMode: CheckMode, aci: ServiceId.Aci, aciIdentityKey: IdentityKey, e164: String?, unidentifiedAccessKey: ByteArray?, usernameHash: ByteArray?, keyTransparencyStore: KeyTransparencyStore): RequestResult { - return unauthWebSocket.runCatchingWithUnauthChatConnection { chatConnection -> + return unauthWebSocket.runCatchingWithChatConnection { chatConnection -> chatConnection.keyTransparencyClient().check(checkMode, aci, aciIdentityKey, e164, unidentifiedAccessKey, usernameHash, keyTransparencyStore) - }.getOrError() + } } } diff --git a/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/NetworkResult.kt b/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/NetworkResult.kt index ebf3792087..b1f7f1fd95 100644 --- a/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/NetworkResult.kt +++ b/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/NetworkResult.kt @@ -8,6 +8,7 @@ package org.whispersystems.signalservice.api import io.reactivex.rxjava3.core.Single import org.signal.core.util.concurrent.safeBlockingGet import org.whispersystems.signalservice.api.NetworkResult.ApplicationError +import org.whispersystems.signalservice.api.NetworkResult.Companion.fromWebSocket import org.whispersystems.signalservice.api.NetworkResult.StatusCodeError import org.whispersystems.signalservice.api.push.exceptions.MalformedRequestException import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulResponseCodeException @@ -150,6 +151,30 @@ sealed class NetworkResult( return fromWebSocket(webSocketResponseConverter) { signalWebSocket.request(request, timeout) } } + /** + * Coroutine-friendly variant of the [fromWebSocket] overload that takes a [WebSocketResponseConverter]. + */ + suspend fun fromWebSocketSuspend( + webSocketResponseConverter: WebSocketResponseConverter, + fetcher: suspend () -> WebsocketResponse + ): NetworkResult { + return try { + webSocketResponseConverter.convert(fetcher()) + } catch (e: kotlinx.coroutines.CancellationException) { + throw e + } catch (e: NonSuccessfulResponseCodeException) { + StatusCodeError(e) + } catch (e: IOException) { + NetworkError(e) + } catch (e: TimeoutException) { + NetworkError(PushNetworkException(e)) + } catch (e: InterruptedException) { + NetworkError(PushNetworkException(e)) + } catch (e: Throwable) { + ApplicationError(e) + } + } + /** * Wraps a local operation, [block], that may throw an exception that should be wrapped in an [ApplicationError] * and abort downstream network requests that directly depend on the output of the local operation. Should @@ -327,6 +352,21 @@ sealed class NetworkResult( } } + /** + * See [fallback]. + */ + suspend fun fallbackSuspend(predicate: (NetworkResult) -> Boolean = { true }, fallback: suspend () -> NetworkResult): NetworkResult { + if (this is Success) { + return this + } + + return if (predicate(this)) { + fallback() + } else { + this + } + } + /** * Takes the output of one [NetworkResult] and passes it as the input to another if the operation is successful. * If it's non-successful, the [result] lambda is not run, and instead the original failure will be propagated. diff --git a/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/attachment/AttachmentApi.kt b/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/attachment/AttachmentApi.kt index 28f40a92f3..190508d9fd 100644 --- a/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/attachment/AttachmentApi.kt +++ b/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/attachment/AttachmentApi.kt @@ -8,10 +8,8 @@ package org.whispersystems.signalservice.api.attachment import kotlinx.coroutines.runBlocking import org.signal.core.util.logging.Log import org.signal.libsignal.net.AuthMessagesService -import org.signal.libsignal.net.AuthenticatedChatConnection import org.signal.libsignal.net.RequestResult import org.signal.libsignal.net.UploadTooLargeException -import org.signal.libsignal.net.getOrError import org.whispersystems.signalservice.api.NetworkResult import org.whispersystems.signalservice.api.crypto.AttachmentCipherStreamUtil import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentRemoteId @@ -23,7 +21,6 @@ import org.whispersystems.signalservice.internal.push.PushAttachmentData import org.whispersystems.signalservice.internal.push.PushServiceSocket import org.whispersystems.signalservice.internal.push.http.AttachmentCipherOutputStreamFactory import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec -import java.io.IOException import java.io.InputStream import kotlin.jvm.optionals.getOrNull @@ -43,12 +40,10 @@ class AttachmentApi( * Gets a v4 attachment upload form, which provides the necessary information to upload an attachment. */ fun getAttachmentV4UploadForm(uploadSizeBytes: Long): RequestResult { - return try { - runBlocking { - authWebSocket.runWithChatConnection { chatConnection -> - AuthMessagesService(chatConnection as AuthenticatedChatConnection).getUploadForm(uploadSizeBytes) - } - }.getOrError().map { form -> + return runBlocking { + authWebSocket.runCatchingWithChatConnection { chatConnection -> + AuthMessagesService(chatConnection).getUploadForm(uploadSizeBytes) + }.map { form -> AttachmentUploadForm( cdn = form.cdn, key = form.key, @@ -56,10 +51,6 @@ class AttachmentApi( signedUploadLocation = form.signedUploadUrl.toString() ) } - } catch (e: IOException) { - RequestResult.RetryableNetworkError(e) - } catch (e: Throwable) { - RequestResult.ApplicationError(e) } } diff --git a/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/message/MessageApi.kt b/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/message/MessageApi.kt index 095054b28e..8faa305bf0 100644 --- a/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/message/MessageApi.kt +++ b/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/message/MessageApi.kt @@ -12,7 +12,6 @@ import org.signal.libsignal.net.MultiRecipientSendAuthorization import org.signal.libsignal.net.MultiRecipientSendFailure import org.signal.libsignal.net.RequestResult import org.signal.libsignal.net.UnauthMessagesService -import org.signal.libsignal.net.getOrError import org.whispersystems.signalservice.api.NetworkResult import org.whispersystems.signalservice.api.crypto.SealedSenderAccess import org.whispersystems.signalservice.api.websocket.SignalWebSocket @@ -79,9 +78,9 @@ class MessageApi( */ fun sendGroupMessage(body: ByteArray, auth: MultiRecipientSendAuthorization, timestamp: Long, online: Boolean, urgent: Boolean): RequestResult { return runBlocking { - unauthWebSocket.runCatchingWithUnauthChatConnection { chatConnection -> + unauthWebSocket.runCatchingWithChatConnection { chatConnection -> UnauthMessagesService(chatConnection).sendMultiRecipientMessage(body, timestamp, auth, online, urgent) - }.getOrError() + } } } diff --git a/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/profiles/ProfileApi.kt b/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/profiles/ProfileApi.kt index a0872accdf..a2c0e2caf0 100644 --- a/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/profiles/ProfileApi.kt +++ b/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/profiles/ProfileApi.kt @@ -115,7 +115,7 @@ class ProfileApi( * - 404: Recipient is not a registered Signal user * - 429: Rate-limited */ - fun getVersionedProfileAndCredential(aci: ServiceId.ACI, profileKey: ProfileKey, sealedSenderAccess: SealedSenderAccess?): NetworkResult> { + suspend fun getVersionedProfileAndCredential(aci: ServiceId.ACI, profileKey: ProfileKey, sealedSenderAccess: SealedSenderAccess?): NetworkResult> { val profileVersion = profileKey.getProfileKeyVersion(aci.libSignalAci).serialize() val profileRequestContext = clientZkProfileOperations.createProfileKeyCredentialRequestContext(SecureRandom(), aci.libSignalAci, profileKey) val serializedProfileRequest = Hex.toStringCondensed(profileRequestContext.request.serialize()) @@ -124,12 +124,12 @@ class ProfileApi( val converter = ProfileAndCredentialResponseConverter(clientZkProfileOperations, profileRequestContext) return if (sealedSenderAccess == null) { - NetworkResult.fromWebSocket(converter) { authWebSocket.request(request) } + NetworkResult.fromWebSocketSuspend(converter) { authWebSocket.requestSuspend(request) } } else { - NetworkResult.fromWebSocket(converter) { unauthWebSocket.request(request, sealedSenderAccess) } - .fallback( + NetworkResult.fromWebSocketSuspend(converter) { unauthWebSocket.requestSuspend(request, sealedSenderAccess) } + .fallbackSuspend( predicate = { it is NetworkResult.StatusCodeError && it.code == 401 }, - fallback = { NetworkResult.fromWebSocket(converter) { authWebSocket.request(request) } } + fallback = { NetworkResult.fromWebSocketSuspend(converter) { authWebSocket.requestSuspend(request) } } ) } } @@ -142,7 +142,7 @@ class ProfileApi( * - 404: Recipient is not a registered Signal user * - 429: Rate-limited */ - fun getVersionedProfile(aci: ServiceId.ACI, profileKey: ProfileKey, sealedSenderAccess: SealedSenderAccess?): NetworkResult { + suspend fun getVersionedProfile(aci: ServiceId.ACI, profileKey: ProfileKey, sealedSenderAccess: SealedSenderAccess?): NetworkResult { val profileKeyIdentifier = profileKey.getProfileKeyVersion(aci.libSignalAci) val profileVersion = profileKeyIdentifier.serialize() @@ -150,12 +150,12 @@ class ProfileApi( val converter = NetworkResult.DefaultWebSocketConverter(SignalServiceProfile::class) return if (sealedSenderAccess == null) { - NetworkResult.fromWebSocket(converter) { authWebSocket.request(request) } + NetworkResult.fromWebSocketSuspend(converter) { authWebSocket.requestSuspend(request) } } else { - NetworkResult.fromWebSocket(converter) { unauthWebSocket.request(request, sealedSenderAccess) } - .fallback( + NetworkResult.fromWebSocketSuspend(converter) { unauthWebSocket.requestSuspend(request, sealedSenderAccess) } + .fallbackSuspend( predicate = { it is NetworkResult.StatusCodeError && it.code == 401 }, - fallback = { NetworkResult.fromWebSocket(converter) { authWebSocket.request(request) } } + fallback = { NetworkResult.fromWebSocketSuspend(converter) { authWebSocket.requestSuspend(request) } } ) } } @@ -168,17 +168,17 @@ class ProfileApi( * - 404: Recipient is not a registered Signal user * - 429: Rate-limited */ - fun getUnversionedProfile(serviceId: ServiceId, sealedSenderAccess: SealedSenderAccess?): NetworkResult { + suspend fun getUnversionedProfile(serviceId: ServiceId, sealedSenderAccess: SealedSenderAccess?): NetworkResult { val request = WebSocketRequestMessage.get("/v1/profile/$serviceId") val converter = NetworkResult.DefaultWebSocketConverter(SignalServiceProfile::class) return if (sealedSenderAccess == null) { - NetworkResult.fromWebSocket(converter) { authWebSocket.request(request) } + NetworkResult.fromWebSocketSuspend(converter) { authWebSocket.requestSuspend(request) } } else { - NetworkResult.fromWebSocket(converter) { unauthWebSocket.request(request, sealedSenderAccess) } - .fallback( + NetworkResult.fromWebSocketSuspend(converter) { unauthWebSocket.requestSuspend(request, sealedSenderAccess) } + .fallbackSuspend( predicate = { it is NetworkResult.StatusCodeError && it.code == 401 }, - fallback = { NetworkResult.fromWebSocket(converter) { authWebSocket.request(request) } } + fallback = { NetworkResult.fromWebSocketSuspend(converter) { authWebSocket.requestSuspend(request) } } ) } } diff --git a/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/websocket/SignalWebSocket.kt b/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/websocket/SignalWebSocket.kt index 5d1d1bd57c..bf1648dd6a 100644 --- a/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/websocket/SignalWebSocket.kt +++ b/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/websocket/SignalWebSocket.kt @@ -21,13 +21,16 @@ import kotlinx.coroutines.launch import org.signal.core.util.logging.Log import org.signal.core.util.orNull import org.signal.libsignal.internal.CompletableFuture +import org.signal.libsignal.net.AuthenticatedChatConnection import org.signal.libsignal.net.BadRequestError +import org.signal.libsignal.net.ChatConnection import org.signal.libsignal.net.RequestResult import org.signal.libsignal.net.UnauthenticatedChatConnection import org.whispersystems.signalservice.api.crypto.SealedSenderAccess import org.whispersystems.signalservice.api.messages.EnvelopeResponse import org.whispersystems.signalservice.api.util.SleepTimer import org.whispersystems.signalservice.internal.push.Envelope +import org.whispersystems.signalservice.internal.util.awaitRequest import org.whispersystems.signalservice.internal.websocket.WebSocketConnection import org.whispersystems.signalservice.internal.websocket.WebSocketRequestMessage import org.whispersystems.signalservice.internal.websocket.WebSocketResponseMessage @@ -176,21 +179,34 @@ sealed class SignalWebSocket( } } + /** + * Coroutine-friendly variant of [request]. + */ + suspend fun requestSuspend(request: WebSocketRequestMessage, timeout: Duration = WebSocketConnection.DEFAULT_SEND_TIMEOUT): WebsocketResponse { + restartDelayedDisconnectIfNecessary() + return getWebSocket().sendRequestSuspend(request, timeout) + } + @Throws(IOException::class) fun sendAck(response: EnvelopeResponse) { getWebSocket().sendResponse(response.websocketRequest.getWebSocketResponse()) } /** - * Executes the given callback with the underlying libsignal chat connection when available. - * - * This is only supported for LibSignal-based connections. - * - * @param callback The callback to execute with the connection. Should be very quick and - * non-blocking, because it may block other operations on that connection. + * Issues a libsignal future-returning request on the chat connection, awaits the result, + * and converts any failure into a [RequestResult] error variant. */ - suspend fun runWithChatConnection(callback: (org.signal.libsignal.net.ChatConnection) -> T): T { - return getWebSocket().runWithChatConnection(callback) + protected suspend fun runCatchingWithChatConnectionInternal( + callback: (ChatConnection) -> CompletableFuture> + ): RequestResult { + return try { + val future = getWebSocket().runWithChatConnection(callback) + future.awaitRequest() + } catch (e: kotlinx.coroutines.CancellationException) { + throw e + } catch (throwable: Throwable) { + throwable.toNetworkRequestResult() + } } @Synchronized @@ -308,27 +324,33 @@ sealed class SignalWebSocket( } } - suspend fun runCatchingWithUnauthChatConnection( - callback: (UnauthenticatedChatConnection) -> CompletableFuture> - ): CompletableFuture> { - val requestFuture = try { - getWebSocket().runWithChatConnection { chatConnection -> - val unauthenticatedConnection = chatConnection as? UnauthenticatedChatConnection - ?: throw IllegalStateException("Expected unauthenticated chat connection but got ${chatConnection::class.java.simpleName}") - callback(unauthenticatedConnection) - } - } catch (throwable: Throwable) { - return CompletableFuture.completedFuture(throwable.toNetworkRequestResult()) + /** + * Coroutine-friendly variant of the sealed-sender [request]. + */ + suspend fun requestSuspend(requestMessage: WebSocketRequestMessage, sealedSenderAccess: SealedSenderAccess): WebsocketResponse { + val headers: MutableList = requestMessage.headers.toMutableList() + if (sealedSenderAccess.applyHeader()) { + headers.add(sealedSenderAccess.header) } - return requestFuture.handle { result, throwable -> - when { - throwable != null -> throwable.toNetworkRequestResult() - result != null -> result - else -> RequestResult.ApplicationError(IllegalStateException("RequestResult was null")) + val message = requestMessage + .newBuilder() + .headers(headers) + .build() + + val response = requestSuspend(message) + if (response.status == 401) { + val fallback = sealedSenderAccess.switchToFallback() + if (fallback != null) { + return requestSuspend(requestMessage, fallback) } } + return response } + + suspend fun runCatchingWithChatConnection( + callback: (UnauthenticatedChatConnection) -> CompletableFuture> + ): RequestResult = runCatchingWithChatConnectionInternal { callback(it as UnauthenticatedChatConnection) } } /** @@ -336,6 +358,10 @@ sealed class SignalWebSocket( */ class AuthenticatedWebSocket(connectionFactory: WebSocketFactory, canConnect: CanConnect, sleepTimer: SleepTimer, disconnectTimeoutMs: Long) : SignalWebSocket(connectionFactory, canConnect, sleepTimer, disconnectTimeoutMs.milliseconds) { + suspend fun runCatchingWithChatConnection( + callback: (AuthenticatedChatConnection) -> CompletableFuture> + ): RequestResult = runCatchingWithChatConnectionInternal { callback(it as AuthenticatedChatConnection) } + /** * The reads a batch of messages off of the websocket. * diff --git a/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/util/CompletableFutureExtensions.kt b/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/util/CompletableFutureExtensions.kt index aee321e81e..fe02ba8751 100644 --- a/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/util/CompletableFutureExtensions.kt +++ b/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/util/CompletableFutureExtensions.kt @@ -5,7 +5,12 @@ package org.whispersystems.signalservice.internal.util +import kotlinx.coroutines.currentCoroutineContext +import kotlinx.coroutines.isActive import org.signal.libsignal.internal.CompletableFuture +import org.signal.libsignal.internal.await +import java.net.SocketException +import kotlin.coroutines.cancellation.CancellationException /** * A Kotlin friendly adapter for [org.signal.libsignal.internal.CompletableFuture.whenComplete] @@ -27,3 +32,15 @@ fun CompletableFuture.whenComplete( } } } + +/** + * Awaits a libsignal-net [CompletableFuture] with cancellation-provenance guarding. + */ +suspend fun CompletableFuture.awaitRequest(): T = try { + await() +} catch (e: CancellationException) { + if (currentCoroutineContext().isActive) { + throw SocketException("Future cancelled by transport").apply { initCause(e) } + } + throw e +} diff --git a/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/websocket/LibSignalChatConnection.kt b/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/websocket/LibSignalChatConnection.kt index e1ac4db1af..d69afe7942 100644 --- a/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/websocket/LibSignalChatConnection.kt +++ b/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/websocket/LibSignalChatConnection.kt @@ -29,12 +29,12 @@ import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulRespons import org.whispersystems.signalservice.api.util.CredentialsProvider import org.whispersystems.signalservice.api.websocket.HealthMonitor import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState +import org.whispersystems.signalservice.internal.util.awaitRequest import org.whispersystems.signalservice.internal.util.whenComplete import java.io.IOException import java.net.SocketException import java.time.Instant import java.util.Optional -import java.util.concurrent.CancellationException import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.Executors import java.util.concurrent.LinkedBlockingQueue @@ -43,6 +43,7 @@ import java.util.concurrent.TimeoutException import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.locks.ReentrantLock import kotlin.concurrent.withLock +import kotlin.coroutines.cancellation.CancellationException import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException import kotlin.time.Duration @@ -264,6 +265,7 @@ class LibSignalChatConnection( pendingCallbacks.clear() } + else -> { Log.i(TAG, "$name Dropped successful connection because we are now ${state.value}") disconnect() @@ -292,9 +294,11 @@ class LibSignalChatConnection( is DeviceDeregisteredException -> { state.onNext(WebSocketConnectionState.AUTHENTICATION_FAILED) } + is AppExpiredException -> { state.onNext(WebSocketConnectionState.REMOTE_DEPRECATED) } + else -> { Log.w(TAG, "Unknown connection failure reason", throwable) state.onNext(WebSocketConnectionState.FAILED) @@ -394,10 +398,12 @@ class LibSignalChatConnection( ) single } + WebSocketConnectionState.CONNECTED -> { sendRequestInternal(request, timeoutSeconds, single) single } + else -> { throw IllegalStateException("LibSignalChatConnection.state was neither dead, CONNECTING, or CONNECTED.") } @@ -405,6 +411,31 @@ class LibSignalChatConnection( } } + /** + * Coroutine-friendly version of [sendRequest] by mirroring [sendRequestInternal]. + */ + override suspend fun sendRequestSuspend(request: WebSocketRequestMessage, timeout: Duration): WebsocketResponse { + val (future, isUnidentified) = runWithChatConnection { connection -> + connection.send(request.toLibSignalRequest(timeout)) to (connection is UnauthenticatedChatConnection) + } + + val response: ChatConnection.Response = try { + future.awaitRequest() + } catch (e: CancellationException) { + throw e + } catch (_: ConnectionInvalidatedException) { + throw NonSuccessfulResponseCodeException(4401) + } catch (e: Throwable) { + Log.w(TAG, "$name [sendRequestSuspend] Failure:", e) + throw SocketException("Failed to get response for request").apply { initCause(e) } + } + + if (response.status in 400..599) { + healthMonitor.onMessageError(status = response.status, isIdentifiedWebSocket = !isUnidentified) + } + return response.toWebsocketResponse(isUnidentified = isUnidentified) + } + override fun sendKeepAlive() { CHAT_SERVICE_LOCK.withLock { if (isDead()) { @@ -523,7 +554,8 @@ class LibSignalChatConnection( // This condition variable is created from CHAT_SERVICE_LOCK, and thus releases CHAT_SERVICE_LOCK // while we await the condition variable. stateChangedOrMessageReceivedCondition.await(remainingTimeoutMillis, TimeUnit.MILLISECONDS) - } catch (_: InterruptedException) { } + } catch (_: InterruptedException) { + } val elapsedTimeMillis = System.currentTimeMillis() - startTime remainingTimeoutMillis = timeoutMillis - elapsedTimeMillis } @@ -598,6 +630,7 @@ class LibSignalChatConnection( } } } + else -> { continuation.resumeWithException(IOException("WebSocket is not connected (state: ${state.value})")) } diff --git a/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/websocket/WebSocketConnection.kt b/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/websocket/WebSocketConnection.kt index 79aae9cae4..92f0bdf20e 100644 --- a/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/websocket/WebSocketConnection.kt +++ b/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/websocket/WebSocketConnection.kt @@ -6,6 +6,7 @@ import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState import java.io.IOException import java.util.Optional import java.util.concurrent.TimeoutException +import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds /** @@ -43,6 +44,10 @@ interface WebSocketConnection { @Throws(IOException::class) fun sendRequest(request: WebSocketRequestMessage, timeoutSeconds: Long): Single + suspend fun sendRequestSuspend(request: WebSocketRequestMessage, timeout: Duration = DEFAULT_SEND_TIMEOUT): WebsocketResponse { + throw UnsupportedOperationException("This connection does not support suspend sendRequest") + } + @Throws(IOException::class) fun sendKeepAlive() diff --git a/lib/network/src/main/java/org/signal/network/api/UsernameApi.kt b/lib/network/src/main/java/org/signal/network/api/UsernameApi.kt index a9f80ad247..8408a2c2e8 100644 --- a/lib/network/src/main/java/org/signal/network/api/UsernameApi.kt +++ b/lib/network/src/main/java/org/signal/network/api/UsernameApi.kt @@ -10,7 +10,6 @@ import org.signal.core.models.ServiceId import org.signal.libsignal.net.LookUpUsernameLinkFailure import org.signal.libsignal.net.RequestResult import org.signal.libsignal.net.UnauthUsernamesService -import org.signal.libsignal.net.getOrError import org.signal.libsignal.usernames.Username import org.whispersystems.signalservice.api.websocket.SignalWebSocket import java.util.UUID @@ -29,9 +28,9 @@ class UsernameApi(private val unauthWebSocket: SignalWebSocket.UnauthenticatedWe */ fun getAciByUsername(username: Username): RequestResult { return runBlocking { - unauthWebSocket.runCatchingWithUnauthChatConnection { chatConnection -> + unauthWebSocket.runCatchingWithChatConnection { chatConnection -> UnauthUsernamesService(chatConnection).lookUpUsernameHash(username.hash) - }.getOrError().map { it?.let { ServiceId.ACI.fromLibSignal(it) } } + }.map { it?.let { ServiceId.ACI.fromLibSignal(it) } } } } @@ -43,9 +42,9 @@ class UsernameApi(private val unauthWebSocket: SignalWebSocket.UnauthenticatedWe */ fun getDecryptedUsernameFromLinkServerIdAndEntropy(serverId: UUID, entropy: ByteArray): RequestResult { return runBlocking { - unauthWebSocket.runCatchingWithUnauthChatConnection { chatConnection -> + unauthWebSocket.runCatchingWithChatConnection { chatConnection -> UnauthUsernamesService(chatConnection).lookUpUsernameLink(serverId, entropy) - }.getOrError() + } } } }