From 523e21f3be6a8a35231336d3085ad5fe6ff2db0e Mon Sep 17 00:00:00 2001 From: AsamK Date: Sat, 22 Jan 2022 16:59:06 +0100 Subject: [PATCH] Improve handling of group send errors over websocket. - Correctly parse error responses from send group message via websocket. - Reduce logging output for mismatched/stale devices exceptions. - Only fallback from websocket to socket if there were technical errors. Closes #11918 --- .../api/SignalServiceMessageSender.java | 35 ++++++++++++------- .../api/services/MessagingService.java | 19 ++++++++-- .../websocket/DefaultErrorMapper.java | 6 +++- .../websocket/DefaultResponseMapper.java | 7 +++- .../internal/websocket/ErrorMapper.java | 3 +- 5 files changed, 53 insertions(+), 17 deletions(-) diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java index e9f2ee2778..09eda89e47 100644 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java @@ -68,6 +68,7 @@ import org.whispersystems.signalservice.api.push.SignalServiceAddress; import org.whispersystems.signalservice.api.push.exceptions.AuthorizationFailedException; import org.whispersystems.signalservice.api.push.exceptions.MalformedResponseException; import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulResponseCodeException; +import org.whispersystems.signalservice.api.push.exceptions.NotFoundException; import org.whispersystems.signalservice.api.push.exceptions.ProofRequiredException; import org.whispersystems.signalservice.api.push.exceptions.PushNetworkException; import org.whispersystems.signalservice.api.push.exceptions.ServerRejectedException; @@ -107,6 +108,7 @@ import org.whispersystems.signalservice.internal.push.SignalServiceProtos.Verifi import org.whispersystems.signalservice.internal.push.StaleDevices; import org.whispersystems.signalservice.internal.push.exceptions.GroupMismatchedDevicesException; import org.whispersystems.signalservice.internal.push.exceptions.GroupStaleDevicesException; +import org.whispersystems.signalservice.internal.push.exceptions.InvalidUnidentifiedAccessHeaderException; import org.whispersystems.signalservice.internal.push.exceptions.MismatchedDevicesException; import org.whispersystems.signalservice.internal.push.exceptions.StaleDevicesException; import org.whispersystems.signalservice.internal.push.http.AttachmentCipherOutputStreamFactory; @@ -1640,6 +1642,9 @@ public class SignalServiceMessageSender { try { SendMessageResponse response = new MessagingService.SendResponseProcessor<>(messagingService.send(messages, Optional.absent()).blockingGet()).getResultOrThrow(); return SendMessageResult.success(recipient, messages.getDevices(), response.sentUnidentified(), response.getNeedsSync() || store.isMultiDevice(), System.currentTimeMillis() - startTime, content.getContent()); + } catch (InvalidUnidentifiedAccessHeaderException | UnregisteredUserException | MismatchedDevicesException | StaleDevicesException e) { + // Non-technical failures shouldn't be retried with socket + throw e; } catch (WebSocketUnavailableException e) { Log.i(TAG, "[sendMessage][" + timestamp + "] Pipe unavailable, falling back... (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")"); } catch (IOException e) { @@ -1650,6 +1655,9 @@ public class SignalServiceMessageSender { try { SendMessageResponse response = new MessagingService.SendResponseProcessor<>(messagingService.send(messages, unidentifiedAccess).blockingGet()).getResultOrThrow(); return SendMessageResult.success(recipient, messages.getDevices(), response.sentUnidentified(), response.getNeedsSync() || store.isMultiDevice(), System.currentTimeMillis() - startTime, content.getContent()); + } catch (InvalidUnidentifiedAccessHeaderException | UnregisteredUserException | MismatchedDevicesException | StaleDevicesException e) { + // Non-technical failures shouldn't be retried with socket + throw e; } catch (WebSocketUnavailableException e) { Log.i(TAG, "[sendMessage][" + timestamp + "] Unidentified pipe unavailable, falling back... (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")"); } catch (IOException e) { @@ -1677,10 +1685,10 @@ public class SignalServiceMessageSender { throw afe; } } catch (MismatchedDevicesException mde) { - Log.w(TAG, mde); + Log.w(TAG, "[sendMessage][" + timestamp + "] Handling mismatched devices. (" + mde.getMessage() + ")"); handleMismatchedDevices(socket, recipient, mde.getMismatchedDevices()); } catch (StaleDevicesException ste) { - Log.w(TAG, ste); + Log.w(TAG, "[sendMessage][" + timestamp + "] Handling stale devices. (" + ste.getMessage() + ")"); handleStaleDevices(recipient, ste.getStaleDevices()); } } @@ -1800,25 +1808,28 @@ public class SignalServiceMessageSender { } try { - SendGroupMessageResponse response = new MessagingService.SendResponseProcessor<>(messagingService.sendToGroup(ciphertext, joinedUnidentifiedAccess, timestamp, online).blockingGet()).getResultOrThrow(); - return transformGroupResponseToMessageResults(targetInfo.devices, response, content); - } catch (WebSocketUnavailableException e) { - Log.i(TAG, "[sendGroupMessage][" + timestamp + "] Pipe unavailable, falling back... (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")"); - } catch (IOException e) { - Log.w(TAG, "[sendGroupMessage][" + timestamp + "] Pipe failed, falling back... (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")"); - } + try { + SendGroupMessageResponse response = new MessagingService.SendResponseProcessor<>(messagingService.sendToGroup(ciphertext, joinedUnidentifiedAccess, timestamp, online).blockingGet()).getResultOrThrow(); + return transformGroupResponseToMessageResults(targetInfo.devices, response, content); + } catch (InvalidUnidentifiedAccessHeaderException | NotFoundException | GroupMismatchedDevicesException | GroupStaleDevicesException e) { + // Non-technical failures shouldn't be retried with socket + throw e; + } catch (WebSocketUnavailableException e) { + Log.i(TAG, "[sendGroupMessage][" + timestamp + "] Pipe unavailable, falling back... (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")"); + } catch (IOException e) { + Log.w(TAG, "[sendGroupMessage][" + timestamp + "] Pipe failed, falling back... (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")"); + } - try { SendGroupMessageResponse response = socket.sendGroupMessage(ciphertext, joinedUnidentifiedAccess, timestamp, online); return transformGroupResponseToMessageResults(targetInfo.devices, response, content); } catch (GroupMismatchedDevicesException e) { - Log.w(TAG, "[sendGroupMessage][" + timestamp + "] Handling mismatched devices.", e); + Log.w(TAG, "[sendGroupMessage][" + timestamp + "] Handling mismatched devices. (" + e.getMessage() + ")"); for (GroupMismatchedDevices mismatched : e.getMismatchedDevices()) { SignalServiceAddress address = new SignalServiceAddress(ACI.parseOrThrow(mismatched.getUuid()), Optional.absent()); handleMismatchedDevices(socket, address, mismatched.getDevices()); } } catch (GroupStaleDevicesException e) { - Log.w(TAG, "[sendGroupMessage][" + timestamp + "] Handling stale devices.", e); + Log.w(TAG, "[sendGroupMessage][" + timestamp + "] Handling stale devices. (" + e.getMessage() + ")"); for (GroupStaleDevices stale : e.getStaleDevices()) { SignalServiceAddress address = new SignalServiceAddress(ACI.parseOrThrow(stale.getUuid()), Optional.absent()); handleStaleDevices(address, stale.getDevices()); diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/services/MessagingService.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/services/MessagingService.java index 0655ba734f..533b104d66 100644 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/services/MessagingService.java +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/services/MessagingService.java @@ -2,7 +2,6 @@ package org.whispersystems.signalservice.api.services; import com.google.protobuf.ByteString; -import org.whispersystems.libsignal.logging.Log; import org.whispersystems.libsignal.util.guava.Optional; import org.whispersystems.signalservice.api.SignalWebSocket; import org.whispersystems.signalservice.api.crypto.UnidentifiedAccess; @@ -10,9 +9,14 @@ import org.whispersystems.signalservice.api.push.exceptions.NotFoundException; import org.whispersystems.signalservice.api.push.exceptions.UnregisteredUserException; import org.whispersystems.signalservice.internal.ServiceResponse; import org.whispersystems.signalservice.internal.ServiceResponseProcessor; +import org.whispersystems.signalservice.internal.push.GroupMismatchedDevices; +import org.whispersystems.signalservice.internal.push.GroupStaleDevices; import org.whispersystems.signalservice.internal.push.OutgoingPushMessageList; import org.whispersystems.signalservice.internal.push.SendGroupMessageResponse; import org.whispersystems.signalservice.internal.push.SendMessageResponse; +import org.whispersystems.signalservice.internal.push.exceptions.GroupMismatchedDevicesException; +import org.whispersystems.signalservice.internal.push.exceptions.GroupStaleDevicesException; +import org.whispersystems.signalservice.internal.push.exceptions.InvalidUnidentifiedAccessHeaderException; import org.whispersystems.signalservice.internal.util.JsonUtil; import org.whispersystems.signalservice.internal.util.Util; import org.whispersystems.signalservice.internal.websocket.DefaultResponseMapper; @@ -85,7 +89,18 @@ public class MessagingService { .build(); return signalWebSocket.request(requestMessage) - .map(DefaultResponseMapper.getDefault(SendGroupMessageResponse.class)::map) + .map(DefaultResponseMapper.extend(SendGroupMessageResponse.class) + .withCustomError(401, (status, errorBody, getHeader) -> new InvalidUnidentifiedAccessHeaderException()) + .withCustomError(404, (status, errorBody, getHeader) -> new NotFoundException("At least one unregistered user in message send.")) + .withCustomError(409, (status, errorBody, getHeader) -> { + GroupMismatchedDevices[] mismatchedDevices = JsonUtil.fromJsonResponse(errorBody, GroupMismatchedDevices[].class); + return new GroupMismatchedDevicesException(mismatchedDevices); + }) + .withCustomError(410, (status, errorBody, getHeader) -> { + GroupStaleDevices[] staleDevices = JsonUtil.fromJsonResponse(errorBody, GroupStaleDevices[].class); + return new GroupStaleDevicesException(staleDevices); + }) + .build()::map) .onErrorReturn(ServiceResponse::forUnknownError); } diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/DefaultErrorMapper.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/DefaultErrorMapper.java index b357ba7309..86ec4ddec5 100644 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/DefaultErrorMapper.java +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/DefaultErrorMapper.java @@ -66,7 +66,11 @@ public final class DefaultErrorMapper implements ErrorMapper { @Override public Throwable parseError(int status, String body, Function getHeader) { if (customErrorMappers.containsKey(status)) { - return customErrorMappers.get(status).parseError(status, body, getHeader); + try { + return customErrorMappers.get(status).parseError(status, body, getHeader); + } catch (MalformedResponseException e) { + return e; + } } switch (status) { diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/DefaultResponseMapper.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/DefaultResponseMapper.java index 3b39dd20e1..924e3338be 100644 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/DefaultResponseMapper.java +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/DefaultResponseMapper.java @@ -42,7 +42,12 @@ public class DefaultResponseMapper implements ResponseMapper @Override public ServiceResponse map(int status, String body, Function getHeader, boolean unidentified) { - Throwable applicationError = errorMapper.parseError(status, body, getHeader); + Throwable applicationError; + try { + applicationError = errorMapper.parseError(status, body, getHeader); + } catch (MalformedResponseException e) { + applicationError = e; + } if (applicationError == null) { try { if (customResponseMapper != null) { diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/ErrorMapper.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/ErrorMapper.java index 344d320e64..c7812da265 100644 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/ErrorMapper.java +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/ErrorMapper.java @@ -1,6 +1,7 @@ package org.whispersystems.signalservice.internal.websocket; import org.whispersystems.libsignal.util.guava.Function; +import org.whispersystems.signalservice.api.push.exceptions.MalformedResponseException; /** * Can map an API response to an appropriate {@link Throwable}. @@ -9,5 +10,5 @@ import org.whispersystems.libsignal.util.guava.Function; * {@link DefaultErrorMapper}. */ public interface ErrorMapper { - Throwable parseError(int status, String body, Function getHeader); + Throwable parseError(int status, String body, Function getHeader) throws MalformedResponseException; }