mirror of
https://github.com/signalapp/Signal-Android.git
synced 2026-05-21 07:40:10 +01:00
Make network calling infrastructure more coroutine friendly.
This commit is contained in:
committed by
Michelle Tang
parent
39529af4e9
commit
43a1c93961
@@ -3,7 +3,6 @@ package org.thoughtcrime.securesms.dependencies
|
||||
import org.signal.libsignal.keytrans.KeyTransparencyException
|
||||
import org.signal.libsignal.net.KeyTransparency.CheckMode
|
||||
import org.signal.libsignal.net.RequestResult
|
||||
import org.signal.libsignal.net.getOrError
|
||||
import org.signal.libsignal.protocol.IdentityKey
|
||||
import org.signal.libsignal.protocol.ServiceId
|
||||
import org.thoughtcrime.securesms.database.model.KeyTransparencyStore
|
||||
@@ -15,8 +14,8 @@ import org.whispersystems.signalservice.api.websocket.SignalWebSocket
|
||||
class KeyTransparencyApi(private val unauthWebSocket: SignalWebSocket.UnauthenticatedWebSocket) {
|
||||
|
||||
suspend fun check(checkMode: CheckMode, aci: ServiceId.Aci, aciIdentityKey: IdentityKey, e164: String?, unidentifiedAccessKey: ByteArray?, usernameHash: ByteArray?, keyTransparencyStore: KeyTransparencyStore): RequestResult<Unit, KeyTransparencyException> {
|
||||
return unauthWebSocket.runCatchingWithUnauthChatConnection { chatConnection ->
|
||||
return unauthWebSocket.runCatchingWithChatConnection { chatConnection ->
|
||||
chatConnection.keyTransparencyClient().check(checkMode, aci, aciIdentityKey, e164, unidentifiedAccessKey, usernameHash, keyTransparencyStore)
|
||||
}.getOrError()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
+40
@@ -8,6 +8,7 @@ package org.whispersystems.signalservice.api
|
||||
import io.reactivex.rxjava3.core.Single
|
||||
import org.signal.core.util.concurrent.safeBlockingGet
|
||||
import org.whispersystems.signalservice.api.NetworkResult.ApplicationError
|
||||
import org.whispersystems.signalservice.api.NetworkResult.Companion.fromWebSocket
|
||||
import org.whispersystems.signalservice.api.NetworkResult.StatusCodeError
|
||||
import org.whispersystems.signalservice.api.push.exceptions.MalformedRequestException
|
||||
import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulResponseCodeException
|
||||
@@ -150,6 +151,30 @@ sealed class NetworkResult<T>(
|
||||
return fromWebSocket(webSocketResponseConverter) { signalWebSocket.request(request, timeout) }
|
||||
}
|
||||
|
||||
/**
|
||||
* Coroutine-friendly variant of the [fromWebSocket] overload that takes a [WebSocketResponseConverter].
|
||||
*/
|
||||
suspend fun <T> fromWebSocketSuspend(
|
||||
webSocketResponseConverter: WebSocketResponseConverter<T>,
|
||||
fetcher: suspend () -> WebsocketResponse
|
||||
): NetworkResult<T> {
|
||||
return try {
|
||||
webSocketResponseConverter.convert(fetcher())
|
||||
} catch (e: kotlinx.coroutines.CancellationException) {
|
||||
throw e
|
||||
} catch (e: NonSuccessfulResponseCodeException) {
|
||||
StatusCodeError(e)
|
||||
} catch (e: IOException) {
|
||||
NetworkError(e)
|
||||
} catch (e: TimeoutException) {
|
||||
NetworkError(PushNetworkException(e))
|
||||
} catch (e: InterruptedException) {
|
||||
NetworkError(PushNetworkException(e))
|
||||
} catch (e: Throwable) {
|
||||
ApplicationError(e)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps a local operation, [block], that may throw an exception that should be wrapped in an [ApplicationError]
|
||||
* and abort downstream network requests that directly depend on the output of the local operation. Should
|
||||
@@ -327,6 +352,21 @@ sealed class NetworkResult<T>(
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* See [fallback].
|
||||
*/
|
||||
suspend fun fallbackSuspend(predicate: (NetworkResult<T>) -> Boolean = { true }, fallback: suspend () -> NetworkResult<T>): NetworkResult<T> {
|
||||
if (this is Success) {
|
||||
return this
|
||||
}
|
||||
|
||||
return if (predicate(this)) {
|
||||
fallback()
|
||||
} else {
|
||||
this
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Takes the output of one [NetworkResult] and passes it as the input to another if the operation is successful.
|
||||
* If it's non-successful, the [result] lambda is not run, and instead the original failure will be propagated.
|
||||
|
||||
+4
-13
@@ -8,10 +8,8 @@ package org.whispersystems.signalservice.api.attachment
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import org.signal.core.util.logging.Log
|
||||
import org.signal.libsignal.net.AuthMessagesService
|
||||
import org.signal.libsignal.net.AuthenticatedChatConnection
|
||||
import org.signal.libsignal.net.RequestResult
|
||||
import org.signal.libsignal.net.UploadTooLargeException
|
||||
import org.signal.libsignal.net.getOrError
|
||||
import org.whispersystems.signalservice.api.NetworkResult
|
||||
import org.whispersystems.signalservice.api.crypto.AttachmentCipherStreamUtil
|
||||
import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentRemoteId
|
||||
@@ -23,7 +21,6 @@ import org.whispersystems.signalservice.internal.push.PushAttachmentData
|
||||
import org.whispersystems.signalservice.internal.push.PushServiceSocket
|
||||
import org.whispersystems.signalservice.internal.push.http.AttachmentCipherOutputStreamFactory
|
||||
import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec
|
||||
import java.io.IOException
|
||||
import java.io.InputStream
|
||||
import kotlin.jvm.optionals.getOrNull
|
||||
|
||||
@@ -43,12 +40,10 @@ class AttachmentApi(
|
||||
* Gets a v4 attachment upload form, which provides the necessary information to upload an attachment.
|
||||
*/
|
||||
fun getAttachmentV4UploadForm(uploadSizeBytes: Long): RequestResult<AttachmentUploadForm, UploadTooLargeException> {
|
||||
return try {
|
||||
runBlocking {
|
||||
authWebSocket.runWithChatConnection { chatConnection ->
|
||||
AuthMessagesService(chatConnection as AuthenticatedChatConnection).getUploadForm(uploadSizeBytes)
|
||||
}
|
||||
}.getOrError().map { form ->
|
||||
return runBlocking {
|
||||
authWebSocket.runCatchingWithChatConnection { chatConnection ->
|
||||
AuthMessagesService(chatConnection).getUploadForm(uploadSizeBytes)
|
||||
}.map { form ->
|
||||
AttachmentUploadForm(
|
||||
cdn = form.cdn,
|
||||
key = form.key,
|
||||
@@ -56,10 +51,6 @@ class AttachmentApi(
|
||||
signedUploadLocation = form.signedUploadUrl.toString()
|
||||
)
|
||||
}
|
||||
} catch (e: IOException) {
|
||||
RequestResult.RetryableNetworkError(e)
|
||||
} catch (e: Throwable) {
|
||||
RequestResult.ApplicationError(e)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
+2
-3
@@ -12,7 +12,6 @@ import org.signal.libsignal.net.MultiRecipientSendAuthorization
|
||||
import org.signal.libsignal.net.MultiRecipientSendFailure
|
||||
import org.signal.libsignal.net.RequestResult
|
||||
import org.signal.libsignal.net.UnauthMessagesService
|
||||
import org.signal.libsignal.net.getOrError
|
||||
import org.whispersystems.signalservice.api.NetworkResult
|
||||
import org.whispersystems.signalservice.api.crypto.SealedSenderAccess
|
||||
import org.whispersystems.signalservice.api.websocket.SignalWebSocket
|
||||
@@ -79,9 +78,9 @@ class MessageApi(
|
||||
*/
|
||||
fun sendGroupMessage(body: ByteArray, auth: MultiRecipientSendAuthorization, timestamp: Long, online: Boolean, urgent: Boolean): RequestResult<MultiRecipientMessageResponse, MultiRecipientSendFailure> {
|
||||
return runBlocking {
|
||||
unauthWebSocket.runCatchingWithUnauthChatConnection { chatConnection ->
|
||||
unauthWebSocket.runCatchingWithChatConnection { chatConnection ->
|
||||
UnauthMessagesService(chatConnection).sendMultiRecipientMessage(body, timestamp, auth, online, urgent)
|
||||
}.getOrError()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
+15
-15
@@ -115,7 +115,7 @@ class ProfileApi(
|
||||
* - 404: Recipient is not a registered Signal user
|
||||
* - 429: Rate-limited
|
||||
*/
|
||||
fun getVersionedProfileAndCredential(aci: ServiceId.ACI, profileKey: ProfileKey, sealedSenderAccess: SealedSenderAccess?): NetworkResult<Pair<SignalServiceProfile, ExpiringProfileKeyCredential?>> {
|
||||
suspend fun getVersionedProfileAndCredential(aci: ServiceId.ACI, profileKey: ProfileKey, sealedSenderAccess: SealedSenderAccess?): NetworkResult<Pair<SignalServiceProfile, ExpiringProfileKeyCredential?>> {
|
||||
val profileVersion = profileKey.getProfileKeyVersion(aci.libSignalAci).serialize()
|
||||
val profileRequestContext = clientZkProfileOperations.createProfileKeyCredentialRequestContext(SecureRandom(), aci.libSignalAci, profileKey)
|
||||
val serializedProfileRequest = Hex.toStringCondensed(profileRequestContext.request.serialize())
|
||||
@@ -124,12 +124,12 @@ class ProfileApi(
|
||||
val converter = ProfileAndCredentialResponseConverter(clientZkProfileOperations, profileRequestContext)
|
||||
|
||||
return if (sealedSenderAccess == null) {
|
||||
NetworkResult.fromWebSocket(converter) { authWebSocket.request(request) }
|
||||
NetworkResult.fromWebSocketSuspend(converter) { authWebSocket.requestSuspend(request) }
|
||||
} else {
|
||||
NetworkResult.fromWebSocket(converter) { unauthWebSocket.request(request, sealedSenderAccess) }
|
||||
.fallback(
|
||||
NetworkResult.fromWebSocketSuspend(converter) { unauthWebSocket.requestSuspend(request, sealedSenderAccess) }
|
||||
.fallbackSuspend(
|
||||
predicate = { it is NetworkResult.StatusCodeError && it.code == 401 },
|
||||
fallback = { NetworkResult.fromWebSocket(converter) { authWebSocket.request(request) } }
|
||||
fallback = { NetworkResult.fromWebSocketSuspend(converter) { authWebSocket.requestSuspend(request) } }
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -142,7 +142,7 @@ class ProfileApi(
|
||||
* - 404: Recipient is not a registered Signal user
|
||||
* - 429: Rate-limited
|
||||
*/
|
||||
fun getVersionedProfile(aci: ServiceId.ACI, profileKey: ProfileKey, sealedSenderAccess: SealedSenderAccess?): NetworkResult<SignalServiceProfile> {
|
||||
suspend fun getVersionedProfile(aci: ServiceId.ACI, profileKey: ProfileKey, sealedSenderAccess: SealedSenderAccess?): NetworkResult<SignalServiceProfile> {
|
||||
val profileKeyIdentifier = profileKey.getProfileKeyVersion(aci.libSignalAci)
|
||||
val profileVersion = profileKeyIdentifier.serialize()
|
||||
|
||||
@@ -150,12 +150,12 @@ class ProfileApi(
|
||||
val converter = NetworkResult.DefaultWebSocketConverter(SignalServiceProfile::class)
|
||||
|
||||
return if (sealedSenderAccess == null) {
|
||||
NetworkResult.fromWebSocket(converter) { authWebSocket.request(request) }
|
||||
NetworkResult.fromWebSocketSuspend(converter) { authWebSocket.requestSuspend(request) }
|
||||
} else {
|
||||
NetworkResult.fromWebSocket(converter) { unauthWebSocket.request(request, sealedSenderAccess) }
|
||||
.fallback(
|
||||
NetworkResult.fromWebSocketSuspend(converter) { unauthWebSocket.requestSuspend(request, sealedSenderAccess) }
|
||||
.fallbackSuspend(
|
||||
predicate = { it is NetworkResult.StatusCodeError && it.code == 401 },
|
||||
fallback = { NetworkResult.fromWebSocket(converter) { authWebSocket.request(request) } }
|
||||
fallback = { NetworkResult.fromWebSocketSuspend(converter) { authWebSocket.requestSuspend(request) } }
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -168,17 +168,17 @@ class ProfileApi(
|
||||
* - 404: Recipient is not a registered Signal user
|
||||
* - 429: Rate-limited
|
||||
*/
|
||||
fun getUnversionedProfile(serviceId: ServiceId, sealedSenderAccess: SealedSenderAccess?): NetworkResult<SignalServiceProfile> {
|
||||
suspend fun getUnversionedProfile(serviceId: ServiceId, sealedSenderAccess: SealedSenderAccess?): NetworkResult<SignalServiceProfile> {
|
||||
val request = WebSocketRequestMessage.get("/v1/profile/$serviceId")
|
||||
val converter = NetworkResult.DefaultWebSocketConverter(SignalServiceProfile::class)
|
||||
|
||||
return if (sealedSenderAccess == null) {
|
||||
NetworkResult.fromWebSocket(converter) { authWebSocket.request(request) }
|
||||
NetworkResult.fromWebSocketSuspend(converter) { authWebSocket.requestSuspend(request) }
|
||||
} else {
|
||||
NetworkResult.fromWebSocket(converter) { unauthWebSocket.request(request, sealedSenderAccess) }
|
||||
.fallback(
|
||||
NetworkResult.fromWebSocketSuspend(converter) { unauthWebSocket.requestSuspend(request, sealedSenderAccess) }
|
||||
.fallbackSuspend(
|
||||
predicate = { it is NetworkResult.StatusCodeError && it.code == 401 },
|
||||
fallback = { NetworkResult.fromWebSocket(converter) { authWebSocket.request(request) } }
|
||||
fallback = { NetworkResult.fromWebSocketSuspend(converter) { authWebSocket.requestSuspend(request) } }
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
+50
-24
@@ -21,13 +21,16 @@ import kotlinx.coroutines.launch
|
||||
import org.signal.core.util.logging.Log
|
||||
import org.signal.core.util.orNull
|
||||
import org.signal.libsignal.internal.CompletableFuture
|
||||
import org.signal.libsignal.net.AuthenticatedChatConnection
|
||||
import org.signal.libsignal.net.BadRequestError
|
||||
import org.signal.libsignal.net.ChatConnection
|
||||
import org.signal.libsignal.net.RequestResult
|
||||
import org.signal.libsignal.net.UnauthenticatedChatConnection
|
||||
import org.whispersystems.signalservice.api.crypto.SealedSenderAccess
|
||||
import org.whispersystems.signalservice.api.messages.EnvelopeResponse
|
||||
import org.whispersystems.signalservice.api.util.SleepTimer
|
||||
import org.whispersystems.signalservice.internal.push.Envelope
|
||||
import org.whispersystems.signalservice.internal.util.awaitRequest
|
||||
import org.whispersystems.signalservice.internal.websocket.WebSocketConnection
|
||||
import org.whispersystems.signalservice.internal.websocket.WebSocketRequestMessage
|
||||
import org.whispersystems.signalservice.internal.websocket.WebSocketResponseMessage
|
||||
@@ -176,21 +179,34 @@ sealed class SignalWebSocket(
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Coroutine-friendly variant of [request].
|
||||
*/
|
||||
suspend fun requestSuspend(request: WebSocketRequestMessage, timeout: Duration = WebSocketConnection.DEFAULT_SEND_TIMEOUT): WebsocketResponse {
|
||||
restartDelayedDisconnectIfNecessary()
|
||||
return getWebSocket().sendRequestSuspend(request, timeout)
|
||||
}
|
||||
|
||||
@Throws(IOException::class)
|
||||
fun sendAck(response: EnvelopeResponse) {
|
||||
getWebSocket().sendResponse(response.websocketRequest.getWebSocketResponse())
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the given callback with the underlying libsignal chat connection when available.
|
||||
*
|
||||
* This is only supported for LibSignal-based connections.
|
||||
*
|
||||
* @param callback The callback to execute with the connection. Should be very quick and
|
||||
* non-blocking, because it may block other operations on that connection.
|
||||
* Issues a libsignal future-returning request on the chat connection, awaits the result,
|
||||
* and converts any failure into a [RequestResult] error variant.
|
||||
*/
|
||||
suspend fun <T> runWithChatConnection(callback: (org.signal.libsignal.net.ChatConnection) -> T): T {
|
||||
return getWebSocket().runWithChatConnection(callback)
|
||||
protected suspend fun <Result, Error : BadRequestError> runCatchingWithChatConnectionInternal(
|
||||
callback: (ChatConnection) -> CompletableFuture<RequestResult<Result, Error>>
|
||||
): RequestResult<Result, Error> {
|
||||
return try {
|
||||
val future = getWebSocket().runWithChatConnection(callback)
|
||||
future.awaitRequest()
|
||||
} catch (e: kotlinx.coroutines.CancellationException) {
|
||||
throw e
|
||||
} catch (throwable: Throwable) {
|
||||
throwable.toNetworkRequestResult()
|
||||
}
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
@@ -308,27 +324,33 @@ sealed class SignalWebSocket(
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun <Result, Error : BadRequestError> runCatchingWithUnauthChatConnection(
|
||||
callback: (UnauthenticatedChatConnection) -> CompletableFuture<RequestResult<Result, Error>>
|
||||
): CompletableFuture<RequestResult<Result, Error>> {
|
||||
val requestFuture = try {
|
||||
getWebSocket().runWithChatConnection { chatConnection ->
|
||||
val unauthenticatedConnection = chatConnection as? UnauthenticatedChatConnection
|
||||
?: throw IllegalStateException("Expected unauthenticated chat connection but got ${chatConnection::class.java.simpleName}")
|
||||
callback(unauthenticatedConnection)
|
||||
}
|
||||
} catch (throwable: Throwable) {
|
||||
return CompletableFuture.completedFuture(throwable.toNetworkRequestResult())
|
||||
/**
|
||||
* Coroutine-friendly variant of the sealed-sender [request].
|
||||
*/
|
||||
suspend fun requestSuspend(requestMessage: WebSocketRequestMessage, sealedSenderAccess: SealedSenderAccess): WebsocketResponse {
|
||||
val headers: MutableList<String> = requestMessage.headers.toMutableList()
|
||||
if (sealedSenderAccess.applyHeader()) {
|
||||
headers.add(sealedSenderAccess.header)
|
||||
}
|
||||
|
||||
return requestFuture.handle { result, throwable ->
|
||||
when {
|
||||
throwable != null -> throwable.toNetworkRequestResult()
|
||||
result != null -> result
|
||||
else -> RequestResult.ApplicationError(IllegalStateException("RequestResult was null"))
|
||||
val message = requestMessage
|
||||
.newBuilder()
|
||||
.headers(headers)
|
||||
.build()
|
||||
|
||||
val response = requestSuspend(message)
|
||||
if (response.status == 401) {
|
||||
val fallback = sealedSenderAccess.switchToFallback()
|
||||
if (fallback != null) {
|
||||
return requestSuspend(requestMessage, fallback)
|
||||
}
|
||||
}
|
||||
return response
|
||||
}
|
||||
|
||||
suspend fun <Result, Error : BadRequestError> runCatchingWithChatConnection(
|
||||
callback: (UnauthenticatedChatConnection) -> CompletableFuture<RequestResult<Result, Error>>
|
||||
): RequestResult<Result, Error> = runCatchingWithChatConnectionInternal { callback(it as UnauthenticatedChatConnection) }
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -336,6 +358,10 @@ sealed class SignalWebSocket(
|
||||
*/
|
||||
class AuthenticatedWebSocket(connectionFactory: WebSocketFactory, canConnect: CanConnect, sleepTimer: SleepTimer, disconnectTimeoutMs: Long) : SignalWebSocket(connectionFactory, canConnect, sleepTimer, disconnectTimeoutMs.milliseconds) {
|
||||
|
||||
suspend fun <Result, Error : BadRequestError> runCatchingWithChatConnection(
|
||||
callback: (AuthenticatedChatConnection) -> CompletableFuture<RequestResult<Result, Error>>
|
||||
): RequestResult<Result, Error> = runCatchingWithChatConnectionInternal { callback(it as AuthenticatedChatConnection) }
|
||||
|
||||
/**
|
||||
* The reads a batch of messages off of the websocket.
|
||||
*
|
||||
|
||||
+17
@@ -5,7 +5,12 @@
|
||||
|
||||
package org.whispersystems.signalservice.internal.util
|
||||
|
||||
import kotlinx.coroutines.currentCoroutineContext
|
||||
import kotlinx.coroutines.isActive
|
||||
import org.signal.libsignal.internal.CompletableFuture
|
||||
import org.signal.libsignal.internal.await
|
||||
import java.net.SocketException
|
||||
import kotlin.coroutines.cancellation.CancellationException
|
||||
|
||||
/**
|
||||
* A Kotlin friendly adapter for [org.signal.libsignal.internal.CompletableFuture.whenComplete]
|
||||
@@ -27,3 +32,15 @@ fun <T> CompletableFuture<T>.whenComplete(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Awaits a libsignal-net [CompletableFuture] with cancellation-provenance guarding.
|
||||
*/
|
||||
suspend fun <T> CompletableFuture<T>.awaitRequest(): T = try {
|
||||
await()
|
||||
} catch (e: CancellationException) {
|
||||
if (currentCoroutineContext().isActive) {
|
||||
throw SocketException("Future cancelled by transport").apply { initCause(e) }
|
||||
}
|
||||
throw e
|
||||
}
|
||||
|
||||
+35
-2
@@ -29,12 +29,12 @@ import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulRespons
|
||||
import org.whispersystems.signalservice.api.util.CredentialsProvider
|
||||
import org.whispersystems.signalservice.api.websocket.HealthMonitor
|
||||
import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState
|
||||
import org.whispersystems.signalservice.internal.util.awaitRequest
|
||||
import org.whispersystems.signalservice.internal.util.whenComplete
|
||||
import java.io.IOException
|
||||
import java.net.SocketException
|
||||
import java.time.Instant
|
||||
import java.util.Optional
|
||||
import java.util.concurrent.CancellationException
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.LinkedBlockingQueue
|
||||
@@ -43,6 +43,7 @@ import java.util.concurrent.TimeoutException
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
import kotlin.concurrent.withLock
|
||||
import kotlin.coroutines.cancellation.CancellationException
|
||||
import kotlin.coroutines.resume
|
||||
import kotlin.coroutines.resumeWithException
|
||||
import kotlin.time.Duration
|
||||
@@ -264,6 +265,7 @@ class LibSignalChatConnection(
|
||||
|
||||
pendingCallbacks.clear()
|
||||
}
|
||||
|
||||
else -> {
|
||||
Log.i(TAG, "$name Dropped successful connection because we are now ${state.value}")
|
||||
disconnect()
|
||||
@@ -292,9 +294,11 @@ class LibSignalChatConnection(
|
||||
is DeviceDeregisteredException -> {
|
||||
state.onNext(WebSocketConnectionState.AUTHENTICATION_FAILED)
|
||||
}
|
||||
|
||||
is AppExpiredException -> {
|
||||
state.onNext(WebSocketConnectionState.REMOTE_DEPRECATED)
|
||||
}
|
||||
|
||||
else -> {
|
||||
Log.w(TAG, "Unknown connection failure reason", throwable)
|
||||
state.onNext(WebSocketConnectionState.FAILED)
|
||||
@@ -394,10 +398,12 @@ class LibSignalChatConnection(
|
||||
)
|
||||
single
|
||||
}
|
||||
|
||||
WebSocketConnectionState.CONNECTED -> {
|
||||
sendRequestInternal(request, timeoutSeconds, single)
|
||||
single
|
||||
}
|
||||
|
||||
else -> {
|
||||
throw IllegalStateException("LibSignalChatConnection.state was neither dead, CONNECTING, or CONNECTED.")
|
||||
}
|
||||
@@ -405,6 +411,31 @@ class LibSignalChatConnection(
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Coroutine-friendly version of [sendRequest] by mirroring [sendRequestInternal].
|
||||
*/
|
||||
override suspend fun sendRequestSuspend(request: WebSocketRequestMessage, timeout: Duration): WebsocketResponse {
|
||||
val (future, isUnidentified) = runWithChatConnection { connection ->
|
||||
connection.send(request.toLibSignalRequest(timeout)) to (connection is UnauthenticatedChatConnection)
|
||||
}
|
||||
|
||||
val response: ChatConnection.Response = try {
|
||||
future.awaitRequest()
|
||||
} catch (e: CancellationException) {
|
||||
throw e
|
||||
} catch (_: ConnectionInvalidatedException) {
|
||||
throw NonSuccessfulResponseCodeException(4401)
|
||||
} catch (e: Throwable) {
|
||||
Log.w(TAG, "$name [sendRequestSuspend] Failure:", e)
|
||||
throw SocketException("Failed to get response for request").apply { initCause(e) }
|
||||
}
|
||||
|
||||
if (response.status in 400..599) {
|
||||
healthMonitor.onMessageError(status = response.status, isIdentifiedWebSocket = !isUnidentified)
|
||||
}
|
||||
return response.toWebsocketResponse(isUnidentified = isUnidentified)
|
||||
}
|
||||
|
||||
override fun sendKeepAlive() {
|
||||
CHAT_SERVICE_LOCK.withLock {
|
||||
if (isDead()) {
|
||||
@@ -523,7 +554,8 @@ class LibSignalChatConnection(
|
||||
// This condition variable is created from CHAT_SERVICE_LOCK, and thus releases CHAT_SERVICE_LOCK
|
||||
// while we await the condition variable.
|
||||
stateChangedOrMessageReceivedCondition.await(remainingTimeoutMillis, TimeUnit.MILLISECONDS)
|
||||
} catch (_: InterruptedException) { }
|
||||
} catch (_: InterruptedException) {
|
||||
}
|
||||
val elapsedTimeMillis = System.currentTimeMillis() - startTime
|
||||
remainingTimeoutMillis = timeoutMillis - elapsedTimeMillis
|
||||
}
|
||||
@@ -598,6 +630,7 @@ class LibSignalChatConnection(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
else -> {
|
||||
continuation.resumeWithException(IOException("WebSocket is not connected (state: ${state.value})"))
|
||||
}
|
||||
|
||||
+5
@@ -6,6 +6,7 @@ import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState
|
||||
import java.io.IOException
|
||||
import java.util.Optional
|
||||
import java.util.concurrent.TimeoutException
|
||||
import kotlin.time.Duration
|
||||
import kotlin.time.Duration.Companion.seconds
|
||||
|
||||
/**
|
||||
@@ -43,6 +44,10 @@ interface WebSocketConnection {
|
||||
@Throws(IOException::class)
|
||||
fun sendRequest(request: WebSocketRequestMessage, timeoutSeconds: Long): Single<WebsocketResponse>
|
||||
|
||||
suspend fun sendRequestSuspend(request: WebSocketRequestMessage, timeout: Duration = DEFAULT_SEND_TIMEOUT): WebsocketResponse {
|
||||
throw UnsupportedOperationException("This connection does not support suspend sendRequest")
|
||||
}
|
||||
|
||||
@Throws(IOException::class)
|
||||
fun sendKeepAlive()
|
||||
|
||||
|
||||
@@ -10,7 +10,6 @@ import org.signal.core.models.ServiceId
|
||||
import org.signal.libsignal.net.LookUpUsernameLinkFailure
|
||||
import org.signal.libsignal.net.RequestResult
|
||||
import org.signal.libsignal.net.UnauthUsernamesService
|
||||
import org.signal.libsignal.net.getOrError
|
||||
import org.signal.libsignal.usernames.Username
|
||||
import org.whispersystems.signalservice.api.websocket.SignalWebSocket
|
||||
import java.util.UUID
|
||||
@@ -29,9 +28,9 @@ class UsernameApi(private val unauthWebSocket: SignalWebSocket.UnauthenticatedWe
|
||||
*/
|
||||
fun getAciByUsername(username: Username): RequestResult<ServiceId.ACI?, Nothing> {
|
||||
return runBlocking {
|
||||
unauthWebSocket.runCatchingWithUnauthChatConnection { chatConnection ->
|
||||
unauthWebSocket.runCatchingWithChatConnection { chatConnection ->
|
||||
UnauthUsernamesService(chatConnection).lookUpUsernameHash(username.hash)
|
||||
}.getOrError().map { it?.let { ServiceId.ACI.fromLibSignal(it) } }
|
||||
}.map { it?.let { ServiceId.ACI.fromLibSignal(it) } }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,9 +42,9 @@ class UsernameApi(private val unauthWebSocket: SignalWebSocket.UnauthenticatedWe
|
||||
*/
|
||||
fun getDecryptedUsernameFromLinkServerIdAndEntropy(serverId: UUID, entropy: ByteArray): RequestResult<Username?, LookUpUsernameLinkFailure> {
|
||||
return runBlocking {
|
||||
unauthWebSocket.runCatchingWithUnauthChatConnection { chatConnection ->
|
||||
unauthWebSocket.runCatchingWithChatConnection { chatConnection ->
|
||||
UnauthUsernamesService(chatConnection).lookUpUsernameLink(serverId, entropy)
|
||||
}.getOrError()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user