From 0f8fdda884d104495376460067dff509eac7cd03 Mon Sep 17 00:00:00 2001 From: Cody Henthorne Date: Tue, 6 May 2025 13:32:00 -0400 Subject: [PATCH] Revert "Remove message send REST fallback." This reverts commit 7bdfec77caf14a6a1c48ee3419e9e37d2c541e62. --- .../api/SignalServiceMessageSender.java | 46 +++++++---- .../internal/push/PushServiceSocket.java | 77 +++++++++++++++++++ 2 files changed, 109 insertions(+), 14 deletions(-) diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java index 3c9282c0af..31ada1c468 100644 --- a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java +++ b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java @@ -1941,17 +1941,24 @@ public class SignalServiceMessageSender { throw e; } catch (WebSocketUnavailableException e) { String pipe = sealedSenderAccess == null ? "Pipe" : "Unidentified pipe"; - Log.i(TAG, "[sendMessage][" + timestamp + "] " + pipe + " unavailable (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")"); - throw e; + Log.i(TAG, "[sendMessage][" + timestamp + "] " + pipe + " unavailable, falling back... (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")"); } catch (IOException e) { String pipe = sealedSenderAccess == null ? "Pipe" : "Unidentified pipe"; Throwable cause = e; if (e.getCause() != null) { cause = e.getCause(); } - Log.w(TAG, "[sendMessage][" + timestamp + "] " + pipe + " failed (" + cause.getClass().getSimpleName() + ": " + cause.getMessage() + ")"); - throw (cause instanceof IOException) ? (IOException) cause : e; + Log.w(TAG, "[sendMessage][" + timestamp + "] " + pipe + " failed, falling back... (" + cause.getClass().getSimpleName() + ": " + cause.getMessage() + ")"); } + + if (cancelationSignal != null && cancelationSignal.isCanceled()) { + throw new CancelationException(); + } + + SendMessageResponse response = socket.sendMessage(messages, sealedSenderAccess, story); + + return SendMessageResult.success(recipient, messages.getDevices(), response.sentUnidentified(), response.getNeedsSync() || aciStore.isMultiDevice(), System.currentTimeMillis() - startTime, content.getContent()); + } catch (InvalidKeyException ike) { Log.w(TAG, ike); if (sealedSenderAccess != null) { @@ -2130,7 +2137,7 @@ public class SignalServiceMessageSender { content.getContent() ); return Single.just(result); - } catch (IOException throwable) { + } catch (Throwable throwable) { if (cancelationSignal != null && cancelationSignal.isCanceled()) { return Single.error(new CancelationException()); } @@ -2145,13 +2152,23 @@ public class SignalServiceMessageSender { // Non-technical failures shouldn't be retried with socket return Single.error(throwable); } else if (throwable instanceof WebSocketUnavailableException) { - Log.i(TAG, "[sendMessage][" + timestamp + "] " + (sealedSenderAccess != null ? "Unidentified " : "") + "pipe unavailable (" + throwable.getClass().getSimpleName() + ": " + throwable.getMessage() + ")"); - return Single.error(throwable); - } else { + Log.i(TAG, "[sendMessage][" + timestamp + "] " + (sealedSenderAccess != null ? "Unidentified " : "") + "pipe unavailable, falling back... (" + throwable.getClass().getSimpleName() + ": " + throwable.getMessage() + ")"); + } else if (throwable instanceof IOException) { Throwable cause = throwable.getCause() != null ? throwable.getCause() : throwable; - Log.w(TAG, "[sendMessage][" + timestamp + "] " + (sealedSenderAccess != null ? "Unidentified " : "") + "pipe failed (" + cause.getClass().getSimpleName() + ": " + cause.getMessage() + ")"); - return Single.error((cause instanceof IOException) ? cause : throwable); + Log.w(TAG, "[sendMessage][" + timestamp + "] " + (sealedSenderAccess != null ? "Unidentified " : "") + "pipe failed, falling back... (" + cause.getClass().getSimpleName() + ": " + cause.getMessage() + ")"); } + + return Single.fromCallable(() -> { + SendMessageResponse response = socket.sendMessage(messages, sealedSenderAccess, story); + return SendMessageResult.success( + recipient, + messages.getDevices(), + response.sentUnidentified(), + response.getNeedsSync() || aciStore.isMultiDevice(), + System.currentTimeMillis() - startTime, + content.getContent() + ); + }).subscribeOn(scheduler); } }); @@ -2414,12 +2431,13 @@ public class SignalServiceMessageSender { // Non-technical failures shouldn't be retried with socket throw e; } catch (WebSocketUnavailableException e) { - Log.i(TAG, "[sendGroupMessage][" + timestamp + "] Pipe unavailable (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")"); - throw e; + Log.i(TAG, "[sendGroupMessage][" + timestamp + "] Pipe unavailable, falling back... (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")"); } catch (IOException e) { - Log.w(TAG, "[sendGroupMessage][" + timestamp + "] Pipe failed (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")"); - throw e; + Log.w(TAG, "[sendGroupMessage][" + timestamp + "] Pipe failed, falling back... (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")"); } + + SendGroupMessageResponse response = socket.sendGroupMessage(ciphertext, sealedSenderAccess, timestamp, online, urgent, story); + return transformGroupResponseToMessageResults(targetInfo.devices, response, content); } catch (GroupMismatchedDevicesException e) { Log.w(TAG, "[sendGroupMessage][" + timestamp + "] Handling mismatched devices. (" + e.getMessage() + ")"); for (GroupMismatchedDevices mismatched : e.getMismatchedDevices()) { diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/push/PushServiceSocket.java b/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/push/PushServiceSocket.java index 9b3606f51d..4b13e5d808 100644 --- a/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/push/PushServiceSocket.java +++ b/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/push/PushServiceSocket.java @@ -62,6 +62,7 @@ import org.whispersystems.signalservice.api.push.exceptions.ResumeLocationInvali import org.whispersystems.signalservice.api.push.exceptions.ServerRejectedException; import org.whispersystems.signalservice.api.push.exceptions.SubmitVerificationCodeRateLimitException; import org.whispersystems.signalservice.api.push.exceptions.TokenNotAcceptedException; +import org.whispersystems.signalservice.api.push.exceptions.UnregisteredUserException; import org.whispersystems.signalservice.api.registration.RestoreMethodBody; import org.whispersystems.signalservice.api.svr.Svr3Credentials; import org.whispersystems.signalservice.api.util.CredentialsProvider; @@ -74,8 +75,11 @@ import org.whispersystems.signalservice.internal.configuration.SignalUrl; import org.whispersystems.signalservice.internal.crypto.AttachmentDigest; import org.whispersystems.signalservice.internal.push.exceptions.ForbiddenException; import org.whispersystems.signalservice.internal.push.exceptions.GroupExistsException; +import org.whispersystems.signalservice.internal.push.exceptions.GroupMismatchedDevicesException; import org.whispersystems.signalservice.internal.push.exceptions.GroupNotFoundException; import org.whispersystems.signalservice.internal.push.exceptions.GroupPatchNotAcceptedException; +import org.whispersystems.signalservice.internal.push.exceptions.GroupStaleDevicesException; +import org.whispersystems.signalservice.internal.push.exceptions.InvalidUnidentifiedAccessHeaderException; import org.whispersystems.signalservice.internal.push.exceptions.MismatchedDevicesException; import org.whispersystems.signalservice.internal.push.exceptions.NotInGroupException; import org.whispersystems.signalservice.internal.push.exceptions.StaleDevicesException; @@ -151,6 +155,8 @@ public class PushServiceSocket { private static final String SET_RESTORE_METHOD_PATH = "/v1/devices/restore_account/%s"; + private static final String GROUP_MESSAGE_PATH = "/v1/messages/multi_recipient?ts=%s&online=%s&urgent=%s&story=%s"; + private static final String ATTACHMENT_KEY_DOWNLOAD_PATH = "attachments/%s"; private static final String ATTACHMENT_ID_DOWNLOAD_PATH = "attachments/%d"; private static final String AVATAR_UPLOAD_PATH = ""; @@ -336,6 +342,77 @@ public class PushServiceSocket { patchVerificationSession(sessionId, gcmRegistrationId, null, null, null, null); } + public SendGroupMessageResponse sendGroupMessage(byte[] body, @Nonnull SealedSenderAccess sealedSenderAccess, long timestamp, boolean online, boolean urgent, boolean story) + throws NonSuccessfulResponseCodeException, PushNetworkException, MalformedResponseException + { + ServiceConnectionHolder connectionHolder = (ServiceConnectionHolder) getRandom(serviceClients, random); + + String path = String.format(Locale.US, GROUP_MESSAGE_PATH, timestamp, online, urgent, story); + + Request.Builder requestBuilder = new Request.Builder(); + requestBuilder.url(String.format("%s%s", connectionHolder.getUrl(), path)); + requestBuilder.put(RequestBody.create(MediaType.get("application/vnd.signal-messenger.mrm"), body)); + requestBuilder.addHeader(sealedSenderAccess.getHeaderName(), sealedSenderAccess.getHeaderValue()); + + if (signalAgent != null) { + requestBuilder.addHeader("X-Signal-Agent", signalAgent); + } + + if (connectionHolder.getHostHeader().isPresent()) { + requestBuilder.addHeader("Host", connectionHolder.getHostHeader().get()); + } + + Call call = connectionHolder.getUnidentifiedClient().newCall(requestBuilder.build()); + + synchronized (connections) { + connections.add(call); + } + + try (Response response = call.execute()) { + switch (response.code()) { + case 200: + return readBodyJson(response.body(), SendGroupMessageResponse.class); + case 401: + throw new InvalidUnidentifiedAccessHeaderException(); + case 404: + throw new NotFoundException("At least one unregistered user in message send."); + case 409: + GroupMismatchedDevices[] mismatchedDevices = readBodyJson(response.body(), GroupMismatchedDevices[].class); + throw new GroupMismatchedDevicesException(mismatchedDevices); + case 410: + GroupStaleDevices[] staleDevices = readBodyJson(response.body(), GroupStaleDevices[].class); + throw new GroupStaleDevicesException(staleDevices); + case 508: + throw new ServerRejectedException(); + default: + throw new NonSuccessfulResponseCodeException(response.code()); + } + } catch (PushNetworkException | NonSuccessfulResponseCodeException | MalformedResponseException e) { + throw e; + } catch (IOException e) { + throw new PushNetworkException(e); + } finally { + synchronized (connections) { + connections.remove(call); + } + } + } + + public SendMessageResponse sendMessage(OutgoingPushMessageList bundle, @Nullable SealedSenderAccess sealedSenderAccess, boolean story) + throws IOException + { + try { + String responseText = makeServiceRequest(String.format("/v1/messages/%s?story=%s", bundle.getDestination(), story ? "true" : "false"), "PUT", JsonUtil.toJson(bundle), NO_HEADERS, NO_HANDLER, sealedSenderAccess); + SendMessageResponse response = JsonUtil.fromJson(responseText, SendMessageResponse.class); + + response.setSentUnidentfied(sealedSenderAccess != null); + + return response; + } catch (NotFoundException nfe) { + throw new UnregisteredUserException(bundle.getDestination(), nfe); + } + } + public void retrieveBackup(int cdnNumber, Map headers, String cdnPath, File destination, long maxSizeBytes, ProgressListener listener) throws MissingConfigurationException, IOException {