Switch lookupUsernameHash to use libsignal's typed API wrapper.

This commit is contained in:
andrew-signal
2025-09-17 11:10:30 -04:00
committed by Greyson Parrelli
parent 16d6e98355
commit 957ddc82b5
5 changed files with 137 additions and 59 deletions

View File

@@ -8,6 +8,7 @@ import org.signal.core.util.Result
import org.signal.core.util.Result.Companion.failure import org.signal.core.util.Result.Companion.failure
import org.signal.core.util.Result.Companion.success import org.signal.core.util.Result.Companion.success
import org.signal.core.util.logging.Log import org.signal.core.util.logging.Log
import org.signal.libsignal.net.RequestResult
import org.signal.libsignal.usernames.BaseUsernameException import org.signal.libsignal.usernames.BaseUsernameException
import org.signal.libsignal.usernames.Username import org.signal.libsignal.usernames.Username
import org.thoughtcrime.securesms.components.settings.app.usernamelinks.main.UsernameLinkResetResult import org.thoughtcrime.securesms.components.settings.app.usernamelinks.main.UsernameLinkResetResult
@@ -255,20 +256,18 @@ object UsernameRepository {
} }
when (val result = SignalNetwork.username.getAciByUsername(username)) { when (val result = SignalNetwork.username.getAciByUsername(username)) {
is NetworkResult.Success -> UsernameLinkConversionResult.Success(username, result.result) is RequestResult.Success -> {
is NetworkResult.StatusCodeError -> { result.result?.let {
Log.w(TAG, "[convertLinkToUsername] Failed to lookup user.", result.exception) UsernameLinkConversionResult.Success(username, it)
when (result.code) { } ?: UsernameLinkConversionResult.NotFound(username)
404 -> UsernameLinkConversionResult.NotFound(username)
422 -> UsernameLinkConversionResult.Invalid
else -> UsernameLinkConversionResult.NetworkError
}
} }
is NetworkResult.NetworkError -> { is RequestResult.RetryableNetworkError -> {
Log.w(TAG, "[convertLinkToUsername] Failed to lookup user.", result.exception)
UsernameLinkConversionResult.NetworkError UsernameLinkConversionResult.NetworkError
} }
is NetworkResult.ApplicationError -> throw result.throwable is RequestResult.NonSuccess -> {
throw AssertionError()
}
is RequestResult.ApplicationError -> throw result.cause
} }
} }
.subscribeOn(Schedulers.io()) .subscribeOn(Schedulers.io())
@@ -284,21 +283,18 @@ object UsernameRepository {
} }
return when (val result = SignalNetwork.username.getAciByUsername(username)) { return when (val result = SignalNetwork.username.getAciByUsername(username)) {
is NetworkResult.Success -> UsernameAciFetchResult.Success(result.result) is RequestResult.Success -> {
is NetworkResult.StatusCodeError -> { result.result?.let {
Log.w(TAG, "[fetchAciFromUsername] Failed to get ACI for username hash", result.exception) UsernameAciFetchResult.Success(it)
when (result.code) { } ?: UsernameAciFetchResult.NotFound
404 -> UsernameAciFetchResult.NotFound
else -> UsernameAciFetchResult.NetworkError
}
} }
is RequestResult.NonSuccess -> {
is NetworkResult.NetworkError -> { throw AssertionError()
Log.w(TAG, "[fetchAciFromUsername] Hit network error while trying to resolve ACI from username", result.exception) }
is RequestResult.RetryableNetworkError -> {
UsernameAciFetchResult.NetworkError UsernameAciFetchResult.NetworkError
} }
is RequestResult.ApplicationError -> throw result.cause
is NetworkResult.ApplicationError -> throw result.throwable
} }
} }

View File

@@ -5,14 +5,17 @@
package org.whispersystems.signalservice.api.username package org.whispersystems.signalservice.api.username
import kotlinx.coroutines.runBlocking
import org.signal.core.util.Base64 import org.signal.core.util.Base64
import org.signal.libsignal.net.RequestResult
import org.signal.libsignal.net.UnauthUsernamesService
import org.signal.libsignal.net.getOrError
import org.signal.libsignal.usernames.Username import org.signal.libsignal.usernames.Username
import org.whispersystems.signalservice.api.NetworkResult import org.whispersystems.signalservice.api.NetworkResult
import org.whispersystems.signalservice.api.account.AccountApi import org.whispersystems.signalservice.api.account.AccountApi
import org.whispersystems.signalservice.api.push.ServiceId import org.whispersystems.signalservice.api.push.ServiceId
import org.whispersystems.signalservice.api.websocket.SignalWebSocket import org.whispersystems.signalservice.api.websocket.SignalWebSocket
import org.whispersystems.signalservice.internal.get import org.whispersystems.signalservice.internal.get
import org.whispersystems.signalservice.internal.push.GetAciByUsernameResponse
import org.whispersystems.signalservice.internal.push.GetUsernameFromLinkResponseBody import org.whispersystems.signalservice.internal.push.GetUsernameFromLinkResponseBody
import org.whispersystems.signalservice.internal.websocket.WebSocketRequestMessage import org.whispersystems.signalservice.internal.websocket.WebSocketRequestMessage
import java.util.UUID import java.util.UUID
@@ -25,18 +28,13 @@ class UsernameApi(private val unauthWebSocket: SignalWebSocket.UnauthenticatedWe
/** /**
* Gets the ACI for the given [username], if it exists. This is an unauthenticated request. * Gets the ACI for the given [username], if it exists. This is an unauthenticated request.
*
* GET /v1/accounts/username_hash/[Username.getHash]
* - 200: Success
* - 400: Request must not be authenticated
* - 404: Hash is not associated with an account
*/ */
fun getAciByUsername(username: Username): NetworkResult<ServiceId.ACI> { fun getAciByUsername(username: Username): RequestResult<ServiceId.ACI?, Nothing> {
val usernameHash = Base64.encodeUrlSafeWithoutPadding(username.hash) return runBlocking {
val request = WebSocketRequestMessage.get("/v1/accounts/username_hash/$usernameHash") unauthWebSocket.runWithUnauthChatConnection { chatConnection ->
UnauthUsernamesService(chatConnection).lookUpUsernameHash(username.hash)
return NetworkResult.fromWebSocketRequest(unauthWebSocket, request, GetAciByUsernameResponse::class) }.getOrError().map { it?.let { ServiceId.ACI.fromLibSignal(it) } }
.map { ServiceId.ACI.from(UUID.fromString(it.uuid)) } }
} }
/** /**

View File

@@ -14,6 +14,7 @@ import io.reactivex.rxjava3.schedulers.Schedulers
import io.reactivex.rxjava3.subjects.BehaviorSubject import io.reactivex.rxjava3.subjects.BehaviorSubject
import org.signal.core.util.logging.Log import org.signal.core.util.logging.Log
import org.signal.core.util.orNull import org.signal.core.util.orNull
import org.signal.libsignal.net.ChatConnection
import org.whispersystems.signalservice.api.crypto.SealedSenderAccess import org.whispersystems.signalservice.api.crypto.SealedSenderAccess
import org.whispersystems.signalservice.api.messages.EnvelopeResponse import org.whispersystems.signalservice.api.messages.EnvelopeResponse
import org.whispersystems.signalservice.api.util.SleepTimer import org.whispersystems.signalservice.api.util.SleepTimer
@@ -168,6 +169,18 @@ sealed class SignalWebSocket(
getWebSocket().sendResponse(response.websocketRequest.getWebSocketResponse()) getWebSocket().sendResponse(response.websocketRequest.getWebSocketResponse())
} }
/**
* Executes the given callback with the underlying libsignal chat connection when available.
*
* This is only supported for LibSignal-based connections.
*
* @param callback The callback to execute with the connection. Should be very quick and
* non-blocking, because it may block other operations on that connection.
*/
suspend fun <T> runWithChatConnection(callback: (org.signal.libsignal.net.ChatConnection) -> T): T {
return getWebSocket().runWithChatConnection(callback)
}
@Synchronized @Synchronized
@Throws(WebSocketUnavailableException::class) @Throws(WebSocketUnavailableException::class)
protected fun getWebSocket(): WebSocketConnection { protected fun getWebSocket(): WebSocketConnection {
@@ -311,6 +324,10 @@ sealed class SignalWebSocket(
return Single.error(e) return Single.error(e)
} }
} }
suspend fun <T> runWithUnauthChatConnection(callback: (org.signal.libsignal.net.UnauthenticatedChatConnection) -> T): T {
return getWebSocket().runWithChatConnection(callback as (ChatConnection) -> T)
}
} }
/** /**

View File

@@ -10,6 +10,8 @@ import io.reactivex.rxjava3.core.Single
import io.reactivex.rxjava3.schedulers.Schedulers import io.reactivex.rxjava3.schedulers.Schedulers
import io.reactivex.rxjava3.subjects.BehaviorSubject import io.reactivex.rxjava3.subjects.BehaviorSubject
import io.reactivex.rxjava3.subjects.SingleSubject import io.reactivex.rxjava3.subjects.SingleSubject
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.suspendCancellableCoroutine
import okio.ByteString import okio.ByteString
import okio.ByteString.Companion.toByteString import okio.ByteString.Companion.toByteString
import org.signal.core.util.logging.Log import org.signal.core.util.logging.Log
@@ -41,6 +43,8 @@ import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock import kotlin.concurrent.withLock
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.time.Duration import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds import kotlin.time.Duration.Companion.seconds
import org.signal.libsignal.net.ChatConnection.Request as LibSignalRequest import org.signal.libsignal.net.ChatConnection.Request as LibSignalRequest
@@ -81,10 +85,9 @@ class LibSignalChatConnection(
private val nextIncomingMessageInternalPseudoId = AtomicLong(1) private val nextIncomingMessageInternalPseudoId = AtomicLong(1)
val ackSenderForInternalPseudoId = ConcurrentHashMap<Long, ChatConnectionListener.ServerMessageAck>() val ackSenderForInternalPseudoId = ConcurrentHashMap<Long, ChatConnectionListener.ServerMessageAck>()
private data class RequestAwaitingConnection( private data class PendingAction(
val request: WebSocketRequestMessage, val onConnectionSuccess: (ChatConnection) -> Unit,
val timeoutSeconds: Long, val onFailure: (Throwable) -> Unit
val single: SingleSubject<WebsocketResponse>
) )
// CHAT_SERVICE_LOCK: Protects state, stateChangedOrMessageReceivedCondition, chatConnection, // CHAT_SERVICE_LOCK: Protects state, stateChangedOrMessageReceivedCondition, chatConnection,
@@ -98,9 +101,9 @@ class LibSignalChatConnection(
private var chatConnection: ChatConnection? = null private var chatConnection: ChatConnection? = null
private var chatConnectionFuture: CompletableFuture<out ChatConnection>? = null private var chatConnectionFuture: CompletableFuture<out ChatConnection>? = null
// requestsAwaitingConnection should only have contents when we are transitioning to, out of, or are // pendingCallbacks should only have contents when we are transitioning to, out of, or are
// in the CONNECTING state. // in the CONNECTING state.
private val requestsAwaitingConnection = mutableListOf<RequestAwaitingConnection>() private val pendingCallbacks = mutableListOf<PendingAction>()
companion object { companion object {
const val SERVICE_ENVELOPE_REQUEST_VERB = "PUT" const val SERVICE_ENVELOPE_REQUEST_VERB = "PUT"
@@ -162,14 +165,14 @@ class LibSignalChatConnection(
// There's no sense in resetting nextIncomingMessageInternalPseudoId. // There's no sense in resetting nextIncomingMessageInternalPseudoId.
// This is a belt-and-suspenders check, because the transition handler leaving the CONNECTING // This is a belt-and-suspenders check, because the transition handler leaving the CONNECTING
// state should always cleanup the requestsAwaitingConnection, but in case we miss one, log it // state should always cleanup the pendingCallbacks, but in case we miss one, log it
// as an error and clean it up gracefully // as an error and clean it up gracefully
if (requestsAwaitingConnection.isNotEmpty()) { if (pendingCallbacks.isNotEmpty()) {
Log.w(TAG, "$name [cleanup] ${requestsAwaitingConnection.size} requestsAwaitingConnection during cleanup! This is probably a bug.") Log.w(TAG, "$name [cleanup] ${pendingCallbacks.size} pendingCallbacks during cleanup! This is probably a bug.")
requestsAwaitingConnection.forEach { pending -> pendingCallbacks.forEach { pending ->
pending.single.onError(SocketException("Connection terminated unexpectedly")) pending.onFailure(SocketException("Connection terminated unexpectedly"))
} }
requestsAwaitingConnection.clear() pendingCallbacks.clear()
} }
} }
@@ -251,16 +254,15 @@ class LibSignalChatConnection(
Log.i(TAG, "$name Connected") Log.i(TAG, "$name Connected")
state.onNext(WebSocketConnectionState.CONNECTED) state.onNext(WebSocketConnectionState.CONNECTED)
requestsAwaitingConnection.forEach { pending -> pendingCallbacks.forEach { pending ->
runCatching { runCatching { pending.onConnectionSuccess(connection) }
sendRequestInternal(pending.request, pending.timeoutSeconds, pending.single) .onFailure { e ->
}.onFailure { e -> Log.w(TAG, "$name [handleConnectionSuccess] Failed to execute pending action", e)
Log.w(TAG, "$name [sendRequest] Failed to send pending request", e) pending.onFailure(e)
pending.single.onError(SocketException("Closed unexpectedly")) }
}
} }
requestsAwaitingConnection.clear() pendingCallbacks.clear()
} }
else -> { else -> {
Log.i(TAG, "$name Dropped successful connection because we are now ${state.value}") Log.i(TAG, "$name Dropped successful connection because we are now ${state.value}")
@@ -306,10 +308,10 @@ class LibSignalChatConnection(
else -> SocketException("Closed unexpectedly") else -> SocketException("Closed unexpectedly")
} }
requestsAwaitingConnection.forEach { pending -> pendingCallbacks.forEach { pending ->
pending.single.onError(downstreamThrowable) pending.onFailure(downstreamThrowable)
} }
requestsAwaitingConnection.clear() pendingCallbacks.clear()
} }
} }
@@ -379,7 +381,12 @@ class LibSignalChatConnection(
return when (state.value) { return when (state.value) {
WebSocketConnectionState.CONNECTING -> { WebSocketConnectionState.CONNECTING -> {
Log.i(TAG, "[sendRequest] Enqueuing request send for after connection") Log.i(TAG, "[sendRequest] Enqueuing request send for after connection")
requestsAwaitingConnection.add(RequestAwaitingConnection(request, timeoutSeconds, single)) pendingCallbacks.add(
PendingAction(
onConnectionSuccess = { _ -> sendRequestInternal(request, timeoutSeconds, single) },
onFailure = { error -> single.onError(error) }
)
)
single single
} }
WebSocketConnectionState.CONNECTED -> { WebSocketConnectionState.CONNECTED -> {
@@ -546,6 +553,53 @@ class LibSignalChatConnection(
} }
} }
@OptIn(InternalCoroutinesApi::class)
override suspend fun <T> runWithChatConnection(callback: (ChatConnection) -> T): T = suspendCancellableCoroutine { continuation ->
CHAT_SERVICE_LOCK.withLock {
when (state.value) {
WebSocketConnectionState.CONNECTED -> {
try {
val result = callback(chatConnection!!)
continuation.resume(result)
} catch (e: Exception) {
continuation.resumeWithException(e)
}
}
WebSocketConnectionState.CONNECTING -> {
val action = PendingAction(
onConnectionSuccess = { connection ->
CHAT_SERVICE_LOCK.withLock {
try {
val result = callback(connection)
// NB: We use the experimental tryResume* methods here to avoid crashing if the continuation is
// canceled before we finish the connection attempt, but the PendingAction cannot be removed from
// pendingActions before we get to executing it.
continuation.tryResume(result)?.let(continuation::completeResume)
} catch (e: Throwable) {
continuation.tryResumeWithException(e)?.let(continuation::completeResume)
}
}
},
onFailure = { error ->
continuation.tryResumeWithException(error)?.let(continuation::completeResume)
}
)
pendingCallbacks.add(action)
continuation.invokeOnCancellation {
CHAT_SERVICE_LOCK.withLock {
pendingCallbacks.removeIf { it === action }
}
}
}
else -> {
continuation.resumeWithException(IOException("WebSocket is not connected (state: ${state.value})"))
}
}
}
}
private val listener = LibSignalChatListener() private val listener = LibSignalChatListener()
private inner class LibSignalChatListener : ChatConnectionListener { private inner class LibSignalChatListener : ChatConnectionListener {

View File

@@ -46,4 +46,17 @@ interface WebSocketConnection {
@Throws(IOException::class) @Throws(IOException::class)
fun sendResponse(response: WebSocketResponseMessage) fun sendResponse(response: WebSocketResponseMessage)
/**
* Executes the given callback with the underlying chat connection when it becomes available.
* This is specifically for LibSignal-based connections to access the native connection.
*
* @param callback Function to execute with the chat connection
* @return The result of the callback
* @throws UnsupportedOperationException if this connection doesn't support chat connection access
*/
suspend fun <T> runWithChatConnection(callback: (org.signal.libsignal.net.ChatConnection) -> T): T {
// Default implementation for non-LibSignal connections
throw UnsupportedOperationException("This connection does not support chat connection access")
}
} }