diff --git a/app/src/androidTest/java/org/thoughtcrime/securesms/dependencies/InstrumentationApplicationDependencyProvider.kt b/app/src/androidTest/java/org/thoughtcrime/securesms/dependencies/InstrumentationApplicationDependencyProvider.kt index f540d0e7d2..c80243ab55 100644 --- a/app/src/androidTest/java/org/thoughtcrime/securesms/dependencies/InstrumentationApplicationDependencyProvider.kt +++ b/app/src/androidTest/java/org/thoughtcrime/securesms/dependencies/InstrumentationApplicationDependencyProvider.kt @@ -26,6 +26,7 @@ import org.thoughtcrime.securesms.testing.runSync import org.thoughtcrime.securesms.testing.success import org.whispersystems.signalservice.api.SignalServiceDataStore import org.whispersystems.signalservice.api.SignalServiceMessageSender +import org.whispersystems.signalservice.api.message.MessageApi import org.whispersystems.signalservice.api.push.TrustStore import org.whispersystems.signalservice.api.websocket.SignalWebSocket import org.whispersystems.signalservice.internal.configuration.SignalCdnUrl @@ -122,12 +123,12 @@ class InstrumentationApplicationDependencyProvider(val application: Application, override fun provideSignalServiceMessageSender( authWebSocket: SignalWebSocket.AuthenticatedWebSocket, - unauthWebSocket: SignalWebSocket.UnauthenticatedWebSocket, protocolStore: SignalServiceDataStore, - pushServiceSocket: PushServiceSocket + pushServiceSocket: PushServiceSocket, + messageApi: MessageApi ): SignalServiceMessageSender { if (signalServiceMessageSender == null) { - signalServiceMessageSender = spyk(objToCopy = default.provideSignalServiceMessageSender(authWebSocket, unauthWebSocket, protocolStore, pushServiceSocket)) + signalServiceMessageSender = spyk(objToCopy = default.provideSignalServiceMessageSender(authWebSocket, protocolStore, pushServiceSocket, messageApi)) } return signalServiceMessageSender!! } diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/AppDependencies.kt b/app/src/main/java/org/thoughtcrime/securesms/dependencies/AppDependencies.kt index 5d3c64a679..786b944cb9 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/AppDependencies.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/AppDependencies.kt @@ -362,7 +362,7 @@ object AppDependencies { fun providePushServiceSocket(signalServiceConfiguration: SignalServiceConfiguration, groupsV2Operations: GroupsV2Operations): PushServiceSocket fun provideGroupsV2Operations(signalServiceConfiguration: SignalServiceConfiguration): GroupsV2Operations fun provideSignalServiceAccountManager(authWebSocket: AccountApi, pushServiceSocket: PushServiceSocket, groupsV2Operations: GroupsV2Operations): SignalServiceAccountManager - fun provideSignalServiceMessageSender(authWebSocket: SignalWebSocket.AuthenticatedWebSocket, unauthWebSocket: SignalWebSocket.UnauthenticatedWebSocket, protocolStore: SignalServiceDataStore, pushServiceSocket: PushServiceSocket): SignalServiceMessageSender + fun provideSignalServiceMessageSender(authWebSocket: SignalWebSocket.AuthenticatedWebSocket, protocolStore: SignalServiceDataStore, pushServiceSocket: PushServiceSocket, messageApi: MessageApi): SignalServiceMessageSender fun provideSignalServiceMessageReceiver(pushServiceSocket: PushServiceSocket): SignalServiceMessageReceiver fun provideSignalServiceNetworkAccess(): SignalServiceNetworkAccess fun provideRecipientCache(): LiveRecipientCache diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java index f248d5e0ad..76dcc2f73b 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java @@ -151,12 +151,12 @@ public class ApplicationDependencyProvider implements AppDependencies.Provider { } @Override - public @NonNull SignalServiceMessageSender provideSignalServiceMessageSender(@NonNull SignalWebSocket.AuthenticatedWebSocket authWebSocket, @NonNull SignalWebSocket.UnauthenticatedWebSocket unauthWebSocket, @NonNull SignalServiceDataStore protocolStore, @NonNull PushServiceSocket pushServiceSocket) { + public @NonNull SignalServiceMessageSender provideSignalServiceMessageSender(@NonNull SignalWebSocket.AuthenticatedWebSocket authWebSocket, @NonNull SignalServiceDataStore protocolStore, @NonNull PushServiceSocket pushServiceSocket, @NonNull MessageApi messageApi) { return new SignalServiceMessageSender(pushServiceSocket, protocolStore, ReentrantSessionLock.INSTANCE, authWebSocket, - unauthWebSocket, + messageApi, Optional.of(new SecurityEventListener(context)), SignalExecutors.newCachedBoundedExecutor("signal-messages", ThreadUtil.PRIORITY_IMPORTANT_BACKGROUND_THREAD, 1, 16, 30), ByteUnit.KILOBYTES.toBytes(256)); diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/NetworkDependenciesModule.kt b/app/src/main/java/org/thoughtcrime/securesms/dependencies/NetworkDependenciesModule.kt index 9cb60fa5e5..8f6606396c 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/NetworkDependenciesModule.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/NetworkDependenciesModule.kt @@ -82,7 +82,7 @@ class NetworkDependenciesModule( val protocolStore: SignalServiceDataStoreImpl by _protocolStore private val _signalServiceMessageSender = resettableLazy { - provider.provideSignalServiceMessageSender(authWebSocket, unauthWebSocket, protocolStore, pushServiceSocket) + provider.provideSignalServiceMessageSender(authWebSocket, protocolStore, pushServiceSocket, messageApi) } val signalServiceMessageSender: SignalServiceMessageSender by _signalServiceMessageSender diff --git a/app/src/test/java/org/thoughtcrime/securesms/dependencies/MockApplicationDependencyProvider.kt b/app/src/test/java/org/thoughtcrime/securesms/dependencies/MockApplicationDependencyProvider.kt index a8c1c4134c..31c2a2cde7 100644 --- a/app/src/test/java/org/thoughtcrime/securesms/dependencies/MockApplicationDependencyProvider.kt +++ b/app/src/test/java/org/thoughtcrime/securesms/dependencies/MockApplicationDependencyProvider.kt @@ -72,9 +72,9 @@ class MockApplicationDependencyProvider : AppDependencies.Provider { override fun provideSignalServiceMessageSender( authWebSocket: SignalWebSocket.AuthenticatedWebSocket, - unauthWebSocket: SignalWebSocket.UnauthenticatedWebSocket, protocolStore: SignalServiceDataStore, - pushServiceSocket: PushServiceSocket + pushServiceSocket: PushServiceSocket, + messageApi: MessageApi ): SignalServiceMessageSender { return mockk(relaxed = true) } diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/NetworkResult.kt b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/NetworkResult.kt index 3327c42ac7..5a59840068 100644 --- a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/NetworkResult.kt +++ b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/NetworkResult.kt @@ -5,9 +5,9 @@ package org.whispersystems.signalservice.api +import io.reactivex.rxjava3.core.Single import org.signal.core.util.concurrent.safeBlockingGet import org.whispersystems.signalservice.api.NetworkResult.StatusCodeError -import org.whispersystems.signalservice.api.NetworkResult.Success import org.whispersystems.signalservice.api.push.exceptions.MalformedRequestException import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulResponseCodeException import org.whispersystems.signalservice.api.push.exceptions.PushNetworkException @@ -58,6 +58,45 @@ sealed class NetworkResult( ApplicationError(e) } + /** + * A convenience method to convert a websocket request into a network result, parsing the body into type [T]. + * + * Common HTTP errors will be translated to [StatusCodeError]s. + */ + inline fun fromWebSocket(fetcher: Fetcher>): NetworkResult { + return fromWebSocket(DefaultWebSocketConverter(T::class), fetcher) + } + + /** + * A convenience method to convert a websocket request into a network result, using the provided + * [webSocketResponseConverter] to parse the response into type [T]. + * + * Common HTTP errors will be translated to [StatusCodeError]s. + */ + fun fromWebSocket( + webSocketResponseConverter: WebSocketResponseConverter, + fetcher: Fetcher> + ): NetworkResult { + return try { + val result: Result> = fetcher.fetch() + .map { response: WebsocketResponse -> Result.success(webSocketResponseConverter.convert(response)) } + .onErrorReturn { Result.failure(it) } + .safeBlockingGet() + + result.getOrThrow() + } catch (e: NonSuccessfulResponseCodeException) { + StatusCodeError(e) + } catch (e: IOException) { + NetworkError(e) + } catch (e: TimeoutException) { + NetworkError(PushNetworkException(e)) + } catch (e: InterruptedException) { + NetworkError(PushNetworkException(e)) + } catch (e: Throwable) { + ApplicationError(e) + } + } + /** * A convenience method to convert a websocket request into a network result. * Common HTTP errors will be translated to [StatusCodeError]s. @@ -101,23 +140,8 @@ sealed class NetworkResult( request: WebSocketRequestMessage, timeout: Duration = WebSocketConnection.DEFAULT_SEND_TIMEOUT, webSocketResponseConverter: WebSocketResponseConverter - ): NetworkResult = try { - val result: Result> = signalWebSocket.request(request, timeout) - .map { response: WebsocketResponse -> Result.success(webSocketResponseConverter.convert(response)) } - .onErrorReturn { Result.failure(it) } - .safeBlockingGet() - - result.getOrThrow() - } catch (e: NonSuccessfulResponseCodeException) { - StatusCodeError(e) - } catch (e: IOException) { - NetworkError(e) - } catch (e: TimeoutException) { - NetworkError(PushNetworkException(e)) - } catch (e: InterruptedException) { - NetworkError(PushNetworkException(e)) - } catch (e: Throwable) { - ApplicationError(e) + ): NetworkResult { + return fromWebSocket(webSocketResponseConverter) { signalWebSocket.request(request, timeout) } } /** @@ -187,6 +211,10 @@ sealed class NetworkResult( null } } + + fun header(key: String): String? { + return headers[key.lowercase()] + } } /** Indicates that the application somehow failed in a way unrelated to network activity. Usually a runtime crash. */ @@ -248,6 +276,34 @@ sealed class NetworkResult( } } + /** + * Provides the ability to fallback to [fallback] if the current [NetworkResult] is non-successful. + * + * The [fallback] will only be triggered on non-[Success] results. You can provide a [predicate] to limit what kinds of errors you fallback on + * (the default is to fallback on every error). + * + * This primary usecase of this is to make an unauth websocket request and fallback to auth websocket upon failure. + * + * ```kotlin + * val user: NetworkResult = NetworkResult + * .fromWebSocket { unauthWebSocket.request(request, sealedSenderAccess) } + * .fallback { NetworkResult.fromWebSocket { authWebSocket.request(request) } } + * ``` + * + * @param predicate If this lambda returns true, the fallback will be triggered. + */ + fun fallback(predicate: (NetworkResult) -> Boolean = { true }, fallback: () -> NetworkResult): NetworkResult { + if (this is Success) { + return this + } + + return if (predicate(this)) { + fallback() + } else { + this + } + } + /** * Takes the output of one [NetworkResult] and passes it as the input to another if the operation is successful. * If it's non-successful, the [result] lambda is not run, and instead the original failure will be propagated. @@ -330,6 +386,18 @@ sealed class NetworkResult( fun interface WebSocketResponseConverter { @Throws(Exception::class) fun convert(response: WebsocketResponse): NetworkResult + + fun WebsocketResponse.toStatusCodeError(): NetworkResult { + return StatusCodeError(NonSuccessfulResponseCodeException(this.status, "", this.body, this.headers)) + } + + fun WebsocketResponse.toSuccess(responseJsonClass: KClass): NetworkResult { + return when (responseJsonClass) { + Unit::class -> Success(responseJsonClass.cast(Unit)) + String::class -> Success(responseJsonClass.cast(this.body)) + else -> Success(JsonUtil.fromJson(this.body, responseJsonClass.java)) + } + } } class DefaultWebSocketConverter(private val responseJsonClass: KClass) : WebSocketResponseConverter { @@ -352,15 +420,3 @@ sealed class NetworkResult( } } } - -private fun WebsocketResponse.toStatusCodeError(): NetworkResult { - return StatusCodeError(NonSuccessfulResponseCodeException(this.status, "", this.body, this.headers)) -} - -private fun WebsocketResponse.toSuccess(responseJsonClass: KClass): NetworkResult { - return when (responseJsonClass) { - Unit::class -> Success(responseJsonClass.cast(Unit)) - String::class -> Success(responseJsonClass.cast(this.body)) - else -> Success(JsonUtil.fromJson(this.body, responseJsonClass.java)) - } -} diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/NetworkResultUtil.kt b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/NetworkResultUtil.kt index 6c16b1fbd9..77d49f9f6f 100644 --- a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/NetworkResultUtil.kt +++ b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/NetworkResultUtil.kt @@ -7,7 +7,21 @@ package org.whispersystems.signalservice.api import org.whispersystems.signalservice.api.push.exceptions.AuthorizationFailedException import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulResponseCodeException +import org.whispersystems.signalservice.api.push.exceptions.NotFoundException +import org.whispersystems.signalservice.api.push.exceptions.ProofRequiredException +import org.whispersystems.signalservice.api.push.exceptions.RateLimitException +import org.whispersystems.signalservice.api.push.exceptions.ServerRejectedException +import org.whispersystems.signalservice.api.push.exceptions.UnregisteredUserException +import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException +import org.whispersystems.signalservice.internal.push.SendGroupMessageResponse +import org.whispersystems.signalservice.internal.push.SendMessageResponse +import org.whispersystems.signalservice.internal.push.exceptions.GroupMismatchedDevicesException +import org.whispersystems.signalservice.internal.push.exceptions.GroupStaleDevicesException +import org.whispersystems.signalservice.internal.push.exceptions.InvalidUnidentifiedAccessHeaderException +import org.whispersystems.signalservice.internal.push.exceptions.MismatchedDevicesException +import org.whispersystems.signalservice.internal.push.exceptions.StaleDevicesException import java.io.IOException +import java.util.Optional /** * Bridge layer to convert [NetworkResult]s into the response data or thrown exceptions. @@ -33,9 +47,87 @@ object NetworkResultUtil { is NetworkResult.StatusCodeError -> { when (result.code) { 401, 403 -> throw AuthorizationFailedException(result.code, "Authorization failed!") + 413, 429 -> throw RateLimitException(result.code, "Rate Limited", Optional.ofNullable(result.header("retry-after")?.toLongOrNull())) else -> throw result.exception } } } } + + /** + * Convert [NetworkResult] into expected type exceptions for an individual message send. + */ + @JvmStatic + @Throws( + AuthorizationFailedException::class, + UnregisteredUserException::class, + MismatchedDevicesException::class, + StaleDevicesException::class, + ProofRequiredException::class, + WebSocketUnavailableException::class, + ServerRejectedException::class, + IOException::class + ) + fun toMessageSendLegacy(destination: String, result: NetworkResult): SendMessageResponse { + return when (result) { + is NetworkResult.Success -> result.result + is NetworkResult.ApplicationError -> { + throw when (val error = result.throwable) { + is IOException, is RuntimeException -> error + else -> RuntimeException(error) + } + } + is NetworkResult.NetworkError -> throw result.exception + is NetworkResult.StatusCodeError -> { + throw when (result.code) { + 401 -> AuthorizationFailedException(result.code, "Authorization failed!") + 404 -> UnregisteredUserException(destination, result.exception) + 409 -> MismatchedDevicesException(result.parseJsonBody()) + 410 -> StaleDevicesException(result.parseJsonBody()) + 413, 429 -> RateLimitException(result.code, "Rate Limited", Optional.ofNullable(result.header("retry-after")?.toLongOrNull())) + 428 -> ProofRequiredException(result.parseJsonBody(), result.header("retry-after")?.toLongOrNull() ?: -1) + 508 -> ServerRejectedException() + else -> result.exception + } + } + } + } + + /** + * Convert [NetworkResult] into expected type exceptions for a multi-recipient message send. + */ + @JvmStatic + @Throws( + InvalidUnidentifiedAccessHeaderException::class, + NotFoundException::class, + GroupMismatchedDevicesException::class, + GroupStaleDevicesException::class, + RateLimitException::class, + ServerRejectedException::class, + WebSocketUnavailableException::class, + IOException::class + ) + fun toGroupMessageSendLegacy(result: NetworkResult): SendGroupMessageResponse { + return when (result) { + is NetworkResult.Success -> result.result + is NetworkResult.ApplicationError -> { + throw when (val error = result.throwable) { + is IOException, is RuntimeException -> error + else -> RuntimeException(error) + } + } + is NetworkResult.NetworkError -> throw result.exception + is NetworkResult.StatusCodeError -> { + throw when (result.code) { + 401 -> InvalidUnidentifiedAccessHeaderException() + 404 -> NotFoundException("At least one unregistered user is message send.") + 409 -> GroupMismatchedDevicesException(result.parseJsonBody()) + 410 -> GroupStaleDevicesException(result.parseJsonBody()) + 413, 429 -> throw RateLimitException(result.code, "Rate Limited", Optional.ofNullable(result.header("retry-after")?.toLongOrNull())) + 508 -> ServerRejectedException() + else -> result.exception + } + } + } + } } diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java index e450153cbc..1f5e8402b3 100644 --- a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java +++ b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java @@ -20,7 +20,6 @@ import org.signal.libsignal.protocol.message.PlaintextContent; import org.signal.libsignal.protocol.message.SenderKeyDistributionMessage; import org.signal.libsignal.protocol.state.PreKeyBundle; import org.signal.libsignal.protocol.state.SessionRecord; -import org.signal.libsignal.protocol.util.Pair; import org.signal.libsignal.zkgroup.groupsend.GroupSendFullToken; import org.whispersystems.signalservice.api.crypto.AttachmentCipherStreamUtil; import org.whispersystems.signalservice.api.crypto.ContentHint; @@ -32,6 +31,7 @@ import org.whispersystems.signalservice.api.crypto.SignalSessionBuilder; import org.whispersystems.signalservice.api.crypto.UnidentifiedAccess; import org.whispersystems.signalservice.api.crypto.UntrustedIdentityException; import org.whispersystems.signalservice.api.groupsv2.GroupSendEndorsements; +import org.whispersystems.signalservice.api.message.MessageApi; import org.whispersystems.signalservice.api.messages.SendMessageResult; import org.whispersystems.signalservice.api.messages.SignalServiceAttachment; import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentPointer; @@ -47,7 +47,6 @@ import org.whispersystems.signalservice.api.messages.SignalServiceStoryMessageRe import org.whispersystems.signalservice.api.messages.SignalServiceTextAttachment; import org.whispersystems.signalservice.api.messages.SignalServiceTypingMessage; import org.whispersystems.signalservice.api.messages.calls.AnswerMessage; -import org.whispersystems.signalservice.api.messages.calls.CallingResponse; import org.whispersystems.signalservice.api.messages.calls.IceUpdateMessage; import org.whispersystems.signalservice.api.messages.calls.OfferMessage; import org.whispersystems.signalservice.api.messages.calls.OpaqueMessage; @@ -78,7 +77,6 @@ import org.whispersystems.signalservice.api.push.exceptions.RateLimitException; import org.whispersystems.signalservice.api.push.exceptions.ServerRejectedException; import org.whispersystems.signalservice.api.push.exceptions.UnregisteredUserException; import org.whispersystems.signalservice.api.services.AttachmentService; -import org.whispersystems.signalservice.api.services.MessagingService; import org.whispersystems.signalservice.api.util.AttachmentPointerUtil; import org.whispersystems.signalservice.api.util.CredentialsProvider; import org.whispersystems.signalservice.api.util.Preconditions; @@ -178,7 +176,7 @@ public class SignalServiceMessageSender { private final IdentityKeyPair localPniIdentity; private final AttachmentService attachmentService; - private final MessagingService messagingService; + private final MessageApi messageApi; private final Scheduler scheduler; private final long maxEnvelopeSize; @@ -187,7 +185,7 @@ public class SignalServiceMessageSender { SignalServiceDataStore store, SignalSessionLock sessionLock, SignalWebSocket.AuthenticatedWebSocket authWebSocket, - SignalWebSocket.UnauthenticatedWebSocket unauthWebSocket, + MessageApi messageApi, Optional eventListener, ExecutorService executor, long maxEnvelopeSize) @@ -201,7 +199,7 @@ public class SignalServiceMessageSender { this.localDeviceId = credentialsProvider.getDeviceId(); this.localPni = credentialsProvider.getPni(); this.attachmentService = new AttachmentService(authWebSocket); - this.messagingService = new MessagingService(authWebSocket, unauthWebSocket); + this.messageApi = messageApi; this.eventListener = eventListener; this.maxEnvelopeSize = maxEnvelopeSize; this.localPniIdentity = store.pni().getIdentityKeyPair(); @@ -1940,9 +1938,14 @@ public class SignalServiceMessageSender { } try { - SendMessageResponse response = new MessagingService.SendResponseProcessor<>(messagingService.send(messages, sealedSenderAccess, story).blockingGet()).getResultOrThrow(); + SendMessageResponse response = NetworkResultUtil.toMessageSendLegacy(messages.getDestination(), messageApi.sendMessage(messages, sealedSenderAccess, story)); return SendMessageResult.success(recipient, messages.getDevices(), response.sentUnidentified(), response.getNeedsSync() || aciStore.isMultiDevice(), System.currentTimeMillis() - startTime, content.getContent()); - } catch (InvalidUnidentifiedAccessHeaderException | UnregisteredUserException | MismatchedDevicesException | StaleDevicesException e) { + } catch (AuthorizationFailedException | + UnregisteredUserException | + MismatchedDevicesException | + StaleDevicesException | + ProofRequiredException | + ServerRejectedException e) { // Non-technical failures shouldn't be retried with socket throw e; } catch (WebSocketUnavailableException e) { @@ -2121,16 +2124,19 @@ public class SignalServiceMessageSender { return Single.error(new CancelationException()); } - return messagingService.send(messages, sealedSenderAccess, story) - .map(r -> new kotlin.Pair<>(messages, r)); + return Single.fromCallable(() -> messageApi.sendMessage(messages, sealedSenderAccess, story)) + .subscribeOn(Schedulers.io()) + .observeOn(Schedulers.io()) + .onErrorReturn(NetworkResult.ApplicationError::new) + .map(r -> new kotlin.Pair<>(messages, r)); }) .observeOn(scheduler) .flatMap(pair -> { - final OutgoingPushMessageList messages = pair.getFirst(); - final ServiceResponse serviceResponse = pair.getSecond(); + final OutgoingPushMessageList messages = pair.getFirst(); + final NetworkResult networkResult = pair.getSecond(); - if (serviceResponse.getResult().isPresent()) { - SendMessageResponse response = serviceResponse.getResult().get(); + try { + SendMessageResponse response = NetworkResultUtil.toMessageSendLegacy(messages.getDestination(), networkResult); SendMessageResult result = SendMessageResult.success( recipient, messages.getDevices(), @@ -2140,18 +2146,17 @@ public class SignalServiceMessageSender { content.getContent() ); return Single.just(result); - } else { + } catch (Throwable throwable) { if (cancelationSignal != null && cancelationSignal.isCanceled()) { return Single.error(new CancelationException()); } - //noinspection OptionalGetWithoutIsPresent - Throwable throwable = serviceResponse.getApplicationError().or(serviceResponse::getExecutionError).get(); - - if (throwable instanceof InvalidUnidentifiedAccessHeaderException || + if (throwable instanceof AuthorizationFailedException || throwable instanceof UnregisteredUserException || throwable instanceof MismatchedDevicesException || - throwable instanceof StaleDevicesException) + throwable instanceof StaleDevicesException || + throwable instanceof ProofRequiredException || + throwable instanceof ServerRejectedException) { // Non-technical failures shouldn't be retried with socket return Single.error(throwable); @@ -2424,9 +2429,14 @@ public class SignalServiceMessageSender { try { try { - SendGroupMessageResponse response = new MessagingService.SendResponseProcessor<>(messagingService.sendToGroup(ciphertext, sealedSenderAccess, timestamp, online, urgent, story).blockingGet()).getResultOrThrow(); + + SendGroupMessageResponse response = NetworkResultUtil.toGroupMessageSendLegacy(messageApi.sendGroupMessage(ciphertext, sealedSenderAccess, timestamp, online, urgent, story)); return transformGroupResponseToMessageResults(targetInfo.devices, response, content); - } catch (InvalidUnidentifiedAccessHeaderException | NotFoundException | GroupMismatchedDevicesException | GroupStaleDevicesException e) { + } catch (InvalidUnidentifiedAccessHeaderException | + NotFoundException | + GroupMismatchedDevicesException | + GroupStaleDevicesException | + ServerRejectedException e) { // Non-technical failures shouldn't be retried with socket throw e; } catch (WebSocketUnavailableException e) { diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/message/MessageApi.kt b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/message/MessageApi.kt index 9fcaa4ec0f..b66a50ea79 100644 --- a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/message/MessageApi.kt +++ b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/message/MessageApi.kt @@ -6,10 +6,16 @@ package org.whispersystems.signalservice.api.message import org.whispersystems.signalservice.api.NetworkResult +import org.whispersystems.signalservice.api.crypto.SealedSenderAccess import org.whispersystems.signalservice.api.push.ServiceId import org.whispersystems.signalservice.api.websocket.SignalWebSocket import org.whispersystems.signalservice.internal.post +import org.whispersystems.signalservice.internal.push.OutgoingPushMessageList +import org.whispersystems.signalservice.internal.push.SendGroupMessageResponse +import org.whispersystems.signalservice.internal.push.SendMessageResponse +import org.whispersystems.signalservice.internal.put import org.whispersystems.signalservice.internal.websocket.WebSocketRequestMessage +import org.whispersystems.signalservice.internal.websocket.WebsocketResponse /** * Collection of endpoints for operating on messages. @@ -19,6 +25,70 @@ class MessageApi( private val unauthWebSocket: SignalWebSocket.UnauthenticatedWebSocket ) { + companion object { + /** + * Adjust the default parsing of [SendMessageResponse] to set the non-server returned [SendMessageResponse.sentUnidentfied] + * flag on the model. + */ + private val sendMessageResponseConverter = object : NetworkResult.WebSocketResponseConverter { + override fun convert(response: WebsocketResponse): NetworkResult { + return if (response.status == 200) { + response.toSuccess(SendMessageResponse::class) + .map { it.apply { setSentUnidentfied(response.isUnidentified) } } + } else { + response.toStatusCodeError() + } + } + } + } + + /** + * Sends a message to a single recipient, using the appropriate initial authentication style based on presence of [sealedSenderAccess], but + * will automatically fallback to auth if that fails specifically because of an invalid [sealedSenderAccess]. + * + * PUT /v1/messages/[messageList]`.destination`?story=[story] + * - 200: Success + * - 401: Message is not a story and authorization or [sealedSenderAccess] is missing or incorrect + * - 404: Message is not a story and recipient is not a registered Signal user + * - 409: Mismatched devices + * - 410: Stale devices + * - 428: Sender proof required + */ + fun sendMessage(messageList: OutgoingPushMessageList, sealedSenderAccess: SealedSenderAccess?, story: Boolean): NetworkResult { + val request = WebSocketRequestMessage.put("/v1/messages/${messageList.destination}?story=${story.toQueryParam()}", messageList) + + return if (sealedSenderAccess == null) { + NetworkResult.fromWebSocket(sendMessageResponseConverter) { authWebSocket.request(request) } + } else { + NetworkResult.fromWebSocket(sendMessageResponseConverter) { unauthWebSocket.request(request, sealedSenderAccess) } + .fallback( + predicate = { it is NetworkResult.StatusCodeError && it.code == 401 }, + fallback = { NetworkResult.fromWebSocket(sendMessageResponseConverter) { authWebSocket.request(request) } } + ) + } + } + + /** + * Sends a common message to multiple recipients and requires some form of [sealedSenderAccess] unless it's a story. + * + * PUT /v1/messages/multi_recipient?ts=[timestamp]&online=[online]&urgent=[urgent]&story=[story] + * - 200: Success + * - 400: Message specified delivery to the same recipient device multiple times + * - 401: Message is not a story and [sealedSenderAccess] is missing or incorrect + * - 404: Message is not a story and some of the recipients are not registered Signal users + * - 409: Incorrect set of devices supplied for some recipients + * - 410: Stale devices supplied for some recipients + */ + fun sendGroupMessage(body: ByteArray, sealedSenderAccess: SealedSenderAccess, timestamp: Long, online: Boolean, urgent: Boolean, story: Boolean): NetworkResult { + val request = WebSocketRequestMessage.put( + path = "/v1/messages/multi_recipient?ts=$timestamp&online=${online.toQueryParam()}&urgent=${urgent.toQueryParam()}&story=${story.toQueryParam()}", + body = body, + headers = mapOf("content-type" to "application/vnd.signal-messenger.mrm") + ) + + return NetworkResult.fromWebSocket { unauthWebSocket.request(request, sealedSenderAccess) } + } + /** * Report a message sender and message id as spam. * @@ -29,4 +99,6 @@ class MessageApi( val request = WebSocketRequestMessage.post("/v1/messages/report/$serviceId/$serverGuid", SpamTokenMessage(reportingToken)) return NetworkResult.fromWebSocketRequest(authWebSocket, request) } + + private fun Boolean.toQueryParam(): String = if (this) "true" else "false" } diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/services/MessagingService.java b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/services/MessagingService.java deleted file mode 100644 index 5abb4502f1..0000000000 --- a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/services/MessagingService.java +++ /dev/null @@ -1,129 +0,0 @@ -package org.whispersystems.signalservice.api.services; - -import org.whispersystems.signalservice.api.crypto.SealedSenderAccess; -import org.whispersystems.signalservice.api.push.exceptions.NotFoundException; -import org.whispersystems.signalservice.api.push.exceptions.UnregisteredUserException; -import org.whispersystems.signalservice.api.websocket.SignalWebSocket; -import org.whispersystems.signalservice.internal.ServiceResponse; -import org.whispersystems.signalservice.internal.ServiceResponseProcessor; -import org.whispersystems.signalservice.internal.push.GroupMismatchedDevices; -import org.whispersystems.signalservice.internal.push.GroupStaleDevices; -import org.whispersystems.signalservice.internal.push.OutgoingPushMessageList; -import org.whispersystems.signalservice.internal.push.SendGroupMessageResponse; -import org.whispersystems.signalservice.internal.push.SendMessageResponse; -import org.whispersystems.signalservice.internal.push.exceptions.GroupMismatchedDevicesException; -import org.whispersystems.signalservice.internal.push.exceptions.GroupStaleDevicesException; -import org.whispersystems.signalservice.internal.push.exceptions.InvalidUnidentifiedAccessHeaderException; -import org.whispersystems.signalservice.internal.util.JsonUtil; -import org.whispersystems.signalservice.internal.util.Util; -import org.whispersystems.signalservice.internal.websocket.DefaultResponseMapper; -import org.whispersystems.signalservice.internal.websocket.ResponseMapper; -import org.whispersystems.signalservice.internal.websocket.WebSocketRequestMessage; - -import java.security.SecureRandom; -import java.util.LinkedList; -import java.util.List; -import java.util.Locale; - -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - -import io.reactivex.rxjava3.core.Single; -import okio.ByteString; - -/** - * Provide WebSocket based interface to message sending endpoints. - *

- * Note: To be expanded to have REST fallback and other messaging related operations. - */ -public class MessagingService { - private final SignalWebSocket.AuthenticatedWebSocket authWebSocket; - private final SignalWebSocket.UnauthenticatedWebSocket unauthWebSocket; - - public MessagingService(SignalWebSocket.AuthenticatedWebSocket authWebSocket, SignalWebSocket.UnauthenticatedWebSocket unauthWebSocket) { - this.authWebSocket = authWebSocket; - this.unauthWebSocket = unauthWebSocket; - } - - public Single> send(OutgoingPushMessageList list, - @Nullable SealedSenderAccess sealedSenderAccess, - boolean story) { - List headers = new LinkedList() {{ - add("content-type:application/json"); - }}; - - WebSocketRequestMessage requestMessage = new WebSocketRequestMessage.Builder() - .id(new SecureRandom().nextLong()) - .verb("PUT") - .path(String.format("/v1/messages/%s?story=%s", list.getDestination(), story ? "true" : "false")) - .headers(headers) - .body(ByteString.of(JsonUtil.toJson(list).getBytes())) - .build(); - - ResponseMapper responseMapper = DefaultResponseMapper.extend(SendMessageResponse.class) - .withResponseMapper((status, body, getHeader, unidentified) -> { - SendMessageResponse sendMessageResponse = Util.isEmpty(body) ? new SendMessageResponse(false, unidentified) - : JsonUtil.fromJsonResponse(body, SendMessageResponse.class); - sendMessageResponse.setSentUnidentfied(unidentified); - - return ServiceResponse.forResult(sendMessageResponse, status, body); - }) - .withCustomError(404, (status, body, getHeader) -> new UnregisteredUserException(list.getDestination(), new NotFoundException("not found"))) - .build(); - - if (sealedSenderAccess == null) { - return authWebSocket.request(requestMessage) - .map(responseMapper::map) - .onErrorReturn(ServiceResponse::forUnknownError); - } else { - return unauthWebSocket.request(requestMessage, sealedSenderAccess) - .flatMap(response -> { - if (response.getStatus() == 401) { - return authWebSocket.request(requestMessage); - } else { - return Single.just(response); - } - }) - .map(responseMapper::map) - .onErrorReturn(ServiceResponse::forUnknownError); - } - } - - public Single> sendToGroup(byte[] body, @Nonnull SealedSenderAccess sealedSenderAccess, long timestamp, boolean online, boolean urgent, boolean story) { - List headers = new LinkedList() {{ - add("content-type:application/vnd.signal-messenger.mrm"); - add(sealedSenderAccess.getHeader()); - }}; - - String path = String.format(Locale.US, "/v1/messages/multi_recipient?ts=%s&online=%s&urgent=%s&story=%s", timestamp, online, urgent, story); - - WebSocketRequestMessage requestMessage = new WebSocketRequestMessage.Builder() - .id(new SecureRandom().nextLong()) - .verb("PUT") - .path(path) - .headers(headers) - .body(ByteString.of(body)) - .build(); - - return unauthWebSocket.request(requestMessage) - .map(DefaultResponseMapper.extend(SendGroupMessageResponse.class) - .withCustomError(401, (status, errorBody, getHeader) -> new InvalidUnidentifiedAccessHeaderException()) - .withCustomError(404, (status, errorBody, getHeader) -> new NotFoundException("At least one unregistered user in message send.")) - .withCustomError(409, (status, errorBody, getHeader) -> { - GroupMismatchedDevices[] mismatchedDevices = JsonUtil.fromJsonResponse(errorBody, GroupMismatchedDevices[].class); - return new GroupMismatchedDevicesException(mismatchedDevices); - }) - .withCustomError(410, (status, errorBody, getHeader) -> { - GroupStaleDevices[] staleDevices = JsonUtil.fromJsonResponse(errorBody, GroupStaleDevices[].class); - return new GroupStaleDevicesException(staleDevices); - }) - .build()::map) - .onErrorReturn(ServiceResponse::forUnknownError); - } - - public static class SendResponseProcessor extends ServiceResponseProcessor { - public SendResponseProcessor(ServiceResponse response) { - super(response); - } - } -} diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/WebSocketRequestExt.kt b/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/WebSocketRequestExt.kt index 75c83be602..576bb1c8fb 100644 --- a/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/WebSocketRequestExt.kt +++ b/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/WebSocketRequestExt.kt @@ -5,6 +5,7 @@ package org.whispersystems.signalservice.internal +import okio.ByteString.Companion.toByteString import org.whispersystems.signalservice.internal.util.JsonUtil import org.whispersystems.signalservice.internal.websocket.WebSocketRequestMessage import java.security.SecureRandom @@ -59,6 +60,19 @@ fun WebSocketRequestMessage.Companion.put(path: String, body: Any, headers: Map< ) } +/** + * Create a custom PUT web socket request, where body and content type header are provided by caller. + */ +fun WebSocketRequestMessage.Companion.put(path: String, body: ByteArray, headers: Map): WebSocketRequestMessage { + return WebSocketRequestMessage( + verb = "PUT", + path = path, + headers = headers.toHeaderList(), + body = body.toByteString(), + id = SecureRandom().nextLong() + ) +} + private fun Map.toHeaderList(): List { return map { (key, value) -> "$key:$value" } } diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/push/PushServiceSocket.java b/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/push/PushServiceSocket.java index 916ccd3a8c..a5c21a7cb4 100644 --- a/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/push/PushServiceSocket.java +++ b/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/push/PushServiceSocket.java @@ -202,7 +202,6 @@ public class PushServiceSocket { private static final String SET_RESTORE_METHOD_PATH = "/v1/devices/restore_account/%s"; private static final String WAIT_RESTORE_METHOD_PATH = "/v1/devices/restore_account/%s?timeout=%s"; - private static final String MESSAGE_PATH = "/v1/messages/%s"; private static final String GROUP_MESSAGE_PATH = "/v1/messages/multi_recipient?ts=%s&online=%s&urgent=%s&story=%s"; private static final String ATTACHMENT_V4_PATH = "/v4/attachments/form/upload"; @@ -532,28 +531,6 @@ public class PushServiceSocket { } } - public SignalServiceMessagesResult getMessages(boolean allowStories) throws IOException { - Map headers = Collections.singletonMap("X-Signal-Receive-Stories", allowStories ? "true" : "false"); - - try (Response response = makeServiceRequest(String.format(MESSAGE_PATH, ""), "GET", (RequestBody) null, headers, NO_HANDLER, SealedSenderAccess.NONE, false)) { - validateServiceResponse(response); - - List envelopes = readBodyJson(response.body(), SignalServiceEnvelopeEntityList.class).getMessages(); - - long serverDeliveredTimestamp = 0; - try { - String stringValue = response.header(SERVER_DELIVERED_TIMESTAMP_HEADER); - stringValue = stringValue != null ? stringValue : "0"; - - serverDeliveredTimestamp = Long.parseLong(stringValue); - } catch (NumberFormatException e) { - Log.w(TAG, e); - } - - return new SignalServiceMessagesResult(envelopes, serverDeliveredTimestamp); - } - } - public void registerPreKeys(PreKeyUpload preKeyUpload) throws IOException {