diff --git a/app/src/main/java/org/thoughtcrime/securesms/profiles/manage/UsernameRepository.kt b/app/src/main/java/org/thoughtcrime/securesms/profiles/manage/UsernameRepository.kt index d4361c9ecc..ff648e7eac 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/profiles/manage/UsernameRepository.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/profiles/manage/UsernameRepository.kt @@ -8,6 +8,7 @@ import org.signal.core.util.Result import org.signal.core.util.Result.Companion.failure import org.signal.core.util.Result.Companion.success import org.signal.core.util.logging.Log +import org.signal.libsignal.net.RequestResult import org.signal.libsignal.usernames.BaseUsernameException import org.signal.libsignal.usernames.Username import org.thoughtcrime.securesms.components.settings.app.usernamelinks.main.UsernameLinkResetResult @@ -255,20 +256,18 @@ object UsernameRepository { } when (val result = SignalNetwork.username.getAciByUsername(username)) { - is NetworkResult.Success -> UsernameLinkConversionResult.Success(username, result.result) - is NetworkResult.StatusCodeError -> { - Log.w(TAG, "[convertLinkToUsername] Failed to lookup user.", result.exception) - when (result.code) { - 404 -> UsernameLinkConversionResult.NotFound(username) - 422 -> UsernameLinkConversionResult.Invalid - else -> UsernameLinkConversionResult.NetworkError - } + is RequestResult.Success -> { + result.result?.let { + UsernameLinkConversionResult.Success(username, it) + } ?: UsernameLinkConversionResult.NotFound(username) } - is NetworkResult.NetworkError -> { - Log.w(TAG, "[convertLinkToUsername] Failed to lookup user.", result.exception) + is RequestResult.RetryableNetworkError -> { UsernameLinkConversionResult.NetworkError } - is NetworkResult.ApplicationError -> throw result.throwable + is RequestResult.NonSuccess -> { + throw AssertionError() + } + is RequestResult.ApplicationError -> throw result.cause } } .subscribeOn(Schedulers.io()) @@ -284,21 +283,18 @@ object UsernameRepository { } return when (val result = SignalNetwork.username.getAciByUsername(username)) { - is NetworkResult.Success -> UsernameAciFetchResult.Success(result.result) - is NetworkResult.StatusCodeError -> { - Log.w(TAG, "[fetchAciFromUsername] Failed to get ACI for username hash", result.exception) - when (result.code) { - 404 -> UsernameAciFetchResult.NotFound - else -> UsernameAciFetchResult.NetworkError - } + is RequestResult.Success -> { + result.result?.let { + UsernameAciFetchResult.Success(it) + } ?: UsernameAciFetchResult.NotFound } - - is NetworkResult.NetworkError -> { - Log.w(TAG, "[fetchAciFromUsername] Hit network error while trying to resolve ACI from username", result.exception) + is RequestResult.NonSuccess -> { + throw AssertionError() + } + is RequestResult.RetryableNetworkError -> { UsernameAciFetchResult.NetworkError } - - is NetworkResult.ApplicationError -> throw result.throwable + is RequestResult.ApplicationError -> throw result.cause } } diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/username/UsernameApi.kt b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/username/UsernameApi.kt index 1d0db71dfe..070a20ad14 100644 --- a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/username/UsernameApi.kt +++ b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/username/UsernameApi.kt @@ -5,14 +5,17 @@ package org.whispersystems.signalservice.api.username +import kotlinx.coroutines.runBlocking import org.signal.core.util.Base64 +import org.signal.libsignal.net.RequestResult +import org.signal.libsignal.net.UnauthUsernamesService +import org.signal.libsignal.net.getOrError import org.signal.libsignal.usernames.Username import org.whispersystems.signalservice.api.NetworkResult import org.whispersystems.signalservice.api.account.AccountApi import org.whispersystems.signalservice.api.push.ServiceId import org.whispersystems.signalservice.api.websocket.SignalWebSocket import org.whispersystems.signalservice.internal.get -import org.whispersystems.signalservice.internal.push.GetAciByUsernameResponse import org.whispersystems.signalservice.internal.push.GetUsernameFromLinkResponseBody import org.whispersystems.signalservice.internal.websocket.WebSocketRequestMessage import java.util.UUID @@ -25,18 +28,13 @@ class UsernameApi(private val unauthWebSocket: SignalWebSocket.UnauthenticatedWe /** * Gets the ACI for the given [username], if it exists. This is an unauthenticated request. - * - * GET /v1/accounts/username_hash/[Username.getHash] - * - 200: Success - * - 400: Request must not be authenticated - * - 404: Hash is not associated with an account */ - fun getAciByUsername(username: Username): NetworkResult { - val usernameHash = Base64.encodeUrlSafeWithoutPadding(username.hash) - val request = WebSocketRequestMessage.get("/v1/accounts/username_hash/$usernameHash") - - return NetworkResult.fromWebSocketRequest(unauthWebSocket, request, GetAciByUsernameResponse::class) - .map { ServiceId.ACI.from(UUID.fromString(it.uuid)) } + fun getAciByUsername(username: Username): RequestResult { + return runBlocking { + unauthWebSocket.runWithUnauthChatConnection { chatConnection -> + UnauthUsernamesService(chatConnection).lookUpUsernameHash(username.hash) + }.getOrError().map { it?.let { ServiceId.ACI.fromLibSignal(it) } } + } } /** diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/websocket/SignalWebSocket.kt b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/websocket/SignalWebSocket.kt index f80a9c01c1..55044aa280 100644 --- a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/websocket/SignalWebSocket.kt +++ b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/websocket/SignalWebSocket.kt @@ -14,6 +14,7 @@ import io.reactivex.rxjava3.schedulers.Schedulers import io.reactivex.rxjava3.subjects.BehaviorSubject import org.signal.core.util.logging.Log import org.signal.core.util.orNull +import org.signal.libsignal.net.ChatConnection import org.whispersystems.signalservice.api.crypto.SealedSenderAccess import org.whispersystems.signalservice.api.messages.EnvelopeResponse import org.whispersystems.signalservice.api.util.SleepTimer @@ -168,6 +169,18 @@ sealed class SignalWebSocket( getWebSocket().sendResponse(response.websocketRequest.getWebSocketResponse()) } + /** + * Executes the given callback with the underlying libsignal chat connection when available. + * + * This is only supported for LibSignal-based connections. + * + * @param callback The callback to execute with the connection. Should be very quick and + * non-blocking, because it may block other operations on that connection. + */ + suspend fun runWithChatConnection(callback: (org.signal.libsignal.net.ChatConnection) -> T): T { + return getWebSocket().runWithChatConnection(callback) + } + @Synchronized @Throws(WebSocketUnavailableException::class) protected fun getWebSocket(): WebSocketConnection { @@ -311,6 +324,10 @@ sealed class SignalWebSocket( return Single.error(e) } } + + suspend fun runWithUnauthChatConnection(callback: (org.signal.libsignal.net.UnauthenticatedChatConnection) -> T): T { + return getWebSocket().runWithChatConnection(callback as (ChatConnection) -> T) + } } /** diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/websocket/LibSignalChatConnection.kt b/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/websocket/LibSignalChatConnection.kt index e58c9c29b4..7bfa104053 100644 --- a/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/websocket/LibSignalChatConnection.kt +++ b/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/websocket/LibSignalChatConnection.kt @@ -10,6 +10,8 @@ import io.reactivex.rxjava3.core.Single import io.reactivex.rxjava3.schedulers.Schedulers import io.reactivex.rxjava3.subjects.BehaviorSubject import io.reactivex.rxjava3.subjects.SingleSubject +import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.suspendCancellableCoroutine import okio.ByteString import okio.ByteString.Companion.toByteString import org.signal.core.util.logging.Log @@ -41,6 +43,8 @@ import java.util.concurrent.TimeoutException import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.locks.ReentrantLock import kotlin.concurrent.withLock +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds import org.signal.libsignal.net.ChatConnection.Request as LibSignalRequest @@ -81,10 +85,9 @@ class LibSignalChatConnection( private val nextIncomingMessageInternalPseudoId = AtomicLong(1) val ackSenderForInternalPseudoId = ConcurrentHashMap() - private data class RequestAwaitingConnection( - val request: WebSocketRequestMessage, - val timeoutSeconds: Long, - val single: SingleSubject + private data class PendingAction( + val onConnectionSuccess: (ChatConnection) -> Unit, + val onFailure: (Throwable) -> Unit ) // CHAT_SERVICE_LOCK: Protects state, stateChangedOrMessageReceivedCondition, chatConnection, @@ -98,9 +101,9 @@ class LibSignalChatConnection( private var chatConnection: ChatConnection? = null private var chatConnectionFuture: CompletableFuture? = null - // requestsAwaitingConnection should only have contents when we are transitioning to, out of, or are + // pendingCallbacks should only have contents when we are transitioning to, out of, or are // in the CONNECTING state. - private val requestsAwaitingConnection = mutableListOf() + private val pendingCallbacks = mutableListOf() companion object { const val SERVICE_ENVELOPE_REQUEST_VERB = "PUT" @@ -162,14 +165,14 @@ class LibSignalChatConnection( // There's no sense in resetting nextIncomingMessageInternalPseudoId. // This is a belt-and-suspenders check, because the transition handler leaving the CONNECTING - // state should always cleanup the requestsAwaitingConnection, but in case we miss one, log it + // state should always cleanup the pendingCallbacks, but in case we miss one, log it // as an error and clean it up gracefully - if (requestsAwaitingConnection.isNotEmpty()) { - Log.w(TAG, "$name [cleanup] ${requestsAwaitingConnection.size} requestsAwaitingConnection during cleanup! This is probably a bug.") - requestsAwaitingConnection.forEach { pending -> - pending.single.onError(SocketException("Connection terminated unexpectedly")) + if (pendingCallbacks.isNotEmpty()) { + Log.w(TAG, "$name [cleanup] ${pendingCallbacks.size} pendingCallbacks during cleanup! This is probably a bug.") + pendingCallbacks.forEach { pending -> + pending.onFailure(SocketException("Connection terminated unexpectedly")) } - requestsAwaitingConnection.clear() + pendingCallbacks.clear() } } @@ -251,16 +254,15 @@ class LibSignalChatConnection( Log.i(TAG, "$name Connected") state.onNext(WebSocketConnectionState.CONNECTED) - requestsAwaitingConnection.forEach { pending -> - runCatching { - sendRequestInternal(pending.request, pending.timeoutSeconds, pending.single) - }.onFailure { e -> - Log.w(TAG, "$name [sendRequest] Failed to send pending request", e) - pending.single.onError(SocketException("Closed unexpectedly")) - } + pendingCallbacks.forEach { pending -> + runCatching { pending.onConnectionSuccess(connection) } + .onFailure { e -> + Log.w(TAG, "$name [handleConnectionSuccess] Failed to execute pending action", e) + pending.onFailure(e) + } } - requestsAwaitingConnection.clear() + pendingCallbacks.clear() } else -> { Log.i(TAG, "$name Dropped successful connection because we are now ${state.value}") @@ -306,10 +308,10 @@ class LibSignalChatConnection( else -> SocketException("Closed unexpectedly") } - requestsAwaitingConnection.forEach { pending -> - pending.single.onError(downstreamThrowable) + pendingCallbacks.forEach { pending -> + pending.onFailure(downstreamThrowable) } - requestsAwaitingConnection.clear() + pendingCallbacks.clear() } } @@ -379,7 +381,12 @@ class LibSignalChatConnection( return when (state.value) { WebSocketConnectionState.CONNECTING -> { Log.i(TAG, "[sendRequest] Enqueuing request send for after connection") - requestsAwaitingConnection.add(RequestAwaitingConnection(request, timeoutSeconds, single)) + pendingCallbacks.add( + PendingAction( + onConnectionSuccess = { _ -> sendRequestInternal(request, timeoutSeconds, single) }, + onFailure = { error -> single.onError(error) } + ) + ) single } WebSocketConnectionState.CONNECTED -> { @@ -546,6 +553,53 @@ class LibSignalChatConnection( } } + @OptIn(InternalCoroutinesApi::class) + override suspend fun runWithChatConnection(callback: (ChatConnection) -> T): T = suspendCancellableCoroutine { continuation -> + CHAT_SERVICE_LOCK.withLock { + when (state.value) { + WebSocketConnectionState.CONNECTED -> { + try { + val result = callback(chatConnection!!) + continuation.resume(result) + } catch (e: Exception) { + continuation.resumeWithException(e) + } + } + + WebSocketConnectionState.CONNECTING -> { + val action = PendingAction( + onConnectionSuccess = { connection -> + CHAT_SERVICE_LOCK.withLock { + try { + val result = callback(connection) + // NB: We use the experimental tryResume* methods here to avoid crashing if the continuation is + // canceled before we finish the connection attempt, but the PendingAction cannot be removed from + // pendingActions before we get to executing it. + continuation.tryResume(result)?.let(continuation::completeResume) + } catch (e: Throwable) { + continuation.tryResumeWithException(e)?.let(continuation::completeResume) + } + } + }, + onFailure = { error -> + continuation.tryResumeWithException(error)?.let(continuation::completeResume) + } + ) + pendingCallbacks.add(action) + + continuation.invokeOnCancellation { + CHAT_SERVICE_LOCK.withLock { + pendingCallbacks.removeIf { it === action } + } + } + } + else -> { + continuation.resumeWithException(IOException("WebSocket is not connected (state: ${state.value})")) + } + } + } + } + private val listener = LibSignalChatListener() private inner class LibSignalChatListener : ChatConnectionListener { diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/websocket/WebSocketConnection.kt b/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/websocket/WebSocketConnection.kt index 0564da44a4..5977eceaf3 100644 --- a/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/websocket/WebSocketConnection.kt +++ b/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/websocket/WebSocketConnection.kt @@ -46,4 +46,17 @@ interface WebSocketConnection { @Throws(IOException::class) fun sendResponse(response: WebSocketResponseMessage) + + /** + * Executes the given callback with the underlying chat connection when it becomes available. + * This is specifically for LibSignal-based connections to access the native connection. + * + * @param callback Function to execute with the chat connection + * @return The result of the callback + * @throws UnsupportedOperationException if this connection doesn't support chat connection access + */ + suspend fun runWithChatConnection(callback: (org.signal.libsignal.net.ChatConnection) -> T): T { + // Default implementation for non-LibSignal connections + throw UnsupportedOperationException("This connection does not support chat connection access") + } }