From 5d6d78a51e007142b8d58a36feeaafcb01262949 Mon Sep 17 00:00:00 2001 From: Cody Henthorne Date: Fri, 9 Jul 2021 10:01:24 -0400 Subject: [PATCH] Initial WebSocket refactor. --- .../securesms/ApplicationContext.java | 2 +- .../contacts/sync/DirectoryHelper.java | 66 +-- .../dependencies/ApplicationDependencies.java | 24 +- .../ApplicationDependencyProvider.java | 43 +- .../securesms/jobs/RetrieveProfileJob.java | 137 +++--- .../messages/IncomingMessageObserver.java | 60 +-- .../securesms/messages/WebsocketStrategy.java | 19 +- .../confirm/ConfirmPaymentRepository.java | 3 +- .../securesms/util/ProfileUtil.java | 86 +--- .../core/util/concurrent/SignalExecutors.java | 11 +- .../api/SignalServiceMessagePipe.java | 409 ------------------ .../api/SignalServiceMessageReceiver.java | 55 --- .../api/SignalServiceMessageSender.java | 155 +++---- .../signalservice/api/SignalWebSocket.java | 220 ++++++++++ .../api/services/AttachmentService.java | 56 +++ .../api/services/MessagingService.java | 94 ++++ .../api/services/ProfileService.java | 161 +++++++ .../api/websocket/WebSocketFactory.java | 8 + .../WebSocketUnavailableException.java | 14 + .../internal/ServiceResponse.java | 99 +++++ .../internal/ServiceResponseProcessor.java | 123 ++++++ .../internal/push/LockedException.java | 2 +- .../internal/push/PushServiceSocket.java | 8 +- .../signalservice/internal/util/JsonUtil.java | 10 + .../websocket/DefaultErrorMapper.java | 149 +++++++ .../websocket/DefaultResponseMapper.java | 86 ++++ .../internal/websocket/ErrorMapper.java | 13 + .../internal/websocket/ResponseMapper.java | 23 + .../websocket/WebSocketConnection.java | 162 ++++--- .../websocket/WebSocketEventListener.java | 9 - 30 files changed, 1438 insertions(+), 869 deletions(-) delete mode 100644 libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessagePipe.java create mode 100644 libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalWebSocket.java create mode 100644 libsignal/service/src/main/java/org/whispersystems/signalservice/api/services/AttachmentService.java create mode 100644 libsignal/service/src/main/java/org/whispersystems/signalservice/api/services/MessagingService.java create mode 100644 libsignal/service/src/main/java/org/whispersystems/signalservice/api/services/ProfileService.java create mode 100644 libsignal/service/src/main/java/org/whispersystems/signalservice/api/websocket/WebSocketFactory.java create mode 100644 libsignal/service/src/main/java/org/whispersystems/signalservice/api/websocket/WebSocketUnavailableException.java create mode 100644 libsignal/service/src/main/java/org/whispersystems/signalservice/internal/ServiceResponse.java create mode 100644 libsignal/service/src/main/java/org/whispersystems/signalservice/internal/ServiceResponseProcessor.java create mode 100644 libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/DefaultErrorMapper.java create mode 100644 libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/DefaultResponseMapper.java create mode 100644 libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/ErrorMapper.java create mode 100644 libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/ResponseMapper.java delete mode 100644 libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/WebSocketEventListener.java diff --git a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java index a89dd47e4b..b832b707f7 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java +++ b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java @@ -127,7 +127,7 @@ public class ApplicationContext extends MultiDexApplication implements AppForegr }) .addBlocking("crash-handling", this::initializeCrashHandling) .addBlocking("rx-init", () -> { - RxJavaPlugins.setInitIoSchedulerHandler(schedulerSupplier -> Schedulers.from(SignalExecutors.UNBOUNDED, true, false)); + RxJavaPlugins.setInitIoSchedulerHandler(schedulerSupplier -> Schedulers.from(SignalExecutors.BOUNDED_IO, true, false)); RxJavaPlugins.setInitComputationSchedulerHandler(schedulerSupplier -> Schedulers.from(SignalExecutors.BOUNDED, true, false)); }) .addBlocking("eat-db", () -> DatabaseFactory.getInstance(this)) diff --git a/app/src/main/java/org/thoughtcrime/securesms/contacts/sync/DirectoryHelper.java b/app/src/main/java/org/thoughtcrime/securesms/contacts/sync/DirectoryHelper.java index 8f9c82b9b2..289ae20f92 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/contacts/sync/DirectoryHelper.java +++ b/app/src/main/java/org/thoughtcrime/securesms/contacts/sync/DirectoryHelper.java @@ -53,8 +53,9 @@ import org.whispersystems.libsignal.util.guava.Optional; import org.whispersystems.signalservice.api.profiles.ProfileAndCredential; import org.whispersystems.signalservice.api.profiles.SignalServiceProfile; import org.whispersystems.signalservice.api.push.exceptions.NotFoundException; +import org.whispersystems.signalservice.api.services.ProfileService; import org.whispersystems.signalservice.api.util.UuidUtil; -import org.whispersystems.signalservice.internal.util.concurrent.ListenableFuture; +import org.whispersystems.signalservice.internal.ServiceResponse; import java.io.IOException; import java.util.Calendar; @@ -66,9 +67,10 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; + +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.schedulers.Schedulers; /** * Manages all the stuff around determining if a user is registered or not. @@ -131,7 +133,7 @@ public class DirectoryHelper { Stopwatch stopwatch = new Stopwatch("single"); RecipientDatabase recipientDatabase = DatabaseFactory.getRecipientDatabase(context); RegisteredState originalRegisteredState = recipient.resolve().getRegistered(); - RegisteredState newRegisteredState = null; + RegisteredState newRegisteredState; if (recipient.hasUuid() && !recipient.hasE164()) { boolean isRegistered = isUuidRegistered(context, recipient); @@ -510,29 +512,34 @@ public class DirectoryHelper { .filter(r -> hasCommunicatedWith(context, r)) .toList(); - List>> futures = Stream.of(possiblyUnlisted) - .map(r -> new Pair<>(r, ProfileUtil.retrieveProfile(context, r, SignalServiceProfile.RequestType.PROFILE))) - .toList(); - Set potentiallyActiveIds = new HashSet<>(); - Set retries = new HashSet<>(); + ProfileService profileService = new ProfileService(ApplicationDependencies.getGroupsV2Operations().getProfileOperations(), + ApplicationDependencies.getSignalServiceMessageReceiver(), + ApplicationDependencies.getSignalWebSocket()); - Stream.of(futures) - .forEach(pair -> { - try { - pair.second().get(5, TimeUnit.SECONDS); - potentiallyActiveIds.add(pair.first().getId()); - } catch (InterruptedException | TimeoutException e) { - retries.add(pair.first().getId()); - potentiallyActiveIds.add(pair.first().getId()); - } catch (ExecutionException e) { - if (!(e.getCause() instanceof NotFoundException)) { - retries.add(pair.first().getId()); - potentiallyActiveIds.add(pair.first().getId()); - } - } - }); + List>>> requests = Stream.of(possiblyUnlisted) + .map(r -> ProfileUtil.retrieveProfile(context, r, SignalServiceProfile.RequestType.PROFILE, profileService) + .toObservable() + .timeout(5, TimeUnit.SECONDS) + .onErrorReturn(t -> new Pair<>(r, ServiceResponse.forUnknownError(t)))) + .toList(); - return new UnlistedResult(potentiallyActiveIds, retries); + return Observable.mergeDelayError(requests) + .observeOn(Schedulers.io(), true) + .scan(new UnlistedResult.Builder(), (builder, pair) -> { + Recipient recipient = pair.first(); + ProfileService.ProfileResponseProcessor processor = new ProfileService.ProfileResponseProcessor(pair.second()); + if (processor.hasResult()) { + builder.potentiallyActiveIds.add(recipient.getId()); + } else if (processor.genericIoError() || !processor.notFound()) { + builder.retries.add(recipient.getId()); + builder.potentiallyActiveIds.add(recipient.getId()); + } + + return builder; + }) + .lastOrError() + .map(UnlistedResult.Builder::build) + .blockingGet(); } private static boolean hasCommunicatedWith(@NonNull Context context, @NonNull Recipient recipient) { @@ -584,6 +591,15 @@ public class DirectoryHelper { @NonNull Set getRetries() { return retries; } + + private static class Builder { + final Set potentiallyActiveIds = new HashSet<>(); + final Set retries = new HashSet<>(); + + @NonNull UnlistedResult build() { + return new UnlistedResult(potentiallyActiveIds, retries); + } + } } private static class AccountHolder { diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencies.java b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencies.java index 2210b83d4d..280dee7125 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencies.java +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencies.java @@ -9,6 +9,7 @@ import org.thoughtcrime.securesms.KbsEnclave; import org.thoughtcrime.securesms.components.TypingStatusRepository; import org.thoughtcrime.securesms.components.TypingStatusSender; import org.thoughtcrime.securesms.database.DatabaseObserver; +import org.thoughtcrime.securesms.database.PendingRetryReceiptCache; import org.thoughtcrime.securesms.groups.GroupsV2Authorization; import org.thoughtcrime.securesms.groups.GroupsV2AuthorizationMemoryValueCache; import org.thoughtcrime.securesms.groups.v2.processing.GroupsV2StateProcessor; @@ -18,7 +19,6 @@ import org.thoughtcrime.securesms.megaphone.MegaphoneRepository; import org.thoughtcrime.securesms.messages.BackgroundMessageRetriever; import org.thoughtcrime.securesms.messages.IncomingMessageObserver; import org.thoughtcrime.securesms.messages.IncomingMessageProcessor; -import org.thoughtcrime.securesms.database.PendingRetryReceiptCache; import org.thoughtcrime.securesms.net.PipeConnectivityListener; import org.thoughtcrime.securesms.net.StandardUserAgentInterceptor; import org.thoughtcrime.securesms.notifications.MessageNotifier; @@ -41,6 +41,7 @@ import org.whispersystems.signalservice.api.KeyBackupService; import org.whispersystems.signalservice.api.SignalServiceAccountManager; import org.whispersystems.signalservice.api.SignalServiceMessageReceiver; import org.whispersystems.signalservice.api.SignalServiceMessageSender; +import org.whispersystems.signalservice.api.SignalWebSocket; import org.whispersystems.signalservice.api.groupsv2.GroupsV2Operations; import okhttp3.OkHttpClient; @@ -90,6 +91,7 @@ public class ApplicationDependencies { private static volatile OkHttpClient okHttpClient; private static volatile PendingRetryReceiptManager pendingRetryReceiptManager; private static volatile PendingRetryReceiptCache pendingRetryReceiptCache; + private static volatile SignalWebSocket signalWebSocket; @MainThread public static void init(@NonNull Application application, @NonNull Provider provider) { @@ -184,12 +186,9 @@ public class ApplicationDependencies { synchronized (LOCK) { if (messageSender == null) { - messageSender = provider.provideSignalServiceMessageSender(); + messageSender = provider.provideSignalServiceMessageSender(getSignalWebSocket()); } else { - messageSender.update( - IncomingMessageObserver.getPipe(), - IncomingMessageObserver.getUnidentifiedPipe(), - TextSecurePreferences.isMultiDevice(application)); + messageSender.update(TextSecurePreferences.isMultiDevice(application)); } return messageSender; } @@ -492,12 +491,22 @@ public class ApplicationDependencies { return pendingRetryReceiptCache; } + public static @NonNull SignalWebSocket getSignalWebSocket() { + if (signalWebSocket == null) { + synchronized (LOCK) { + if (signalWebSocket == null) { + signalWebSocket = provider.provideSignalWebSocket(); + } + } + } + return signalWebSocket; + } public interface Provider { @NonNull PipeConnectivityListener providePipeListener(); @NonNull GroupsV2Operations provideGroupsV2Operations(); @NonNull SignalServiceAccountManager provideSignalServiceAccountManager(); - @NonNull SignalServiceMessageSender provideSignalServiceMessageSender(); + @NonNull SignalServiceMessageSender provideSignalServiceMessageSender(@NonNull SignalWebSocket signalWebSocket); @NonNull SignalServiceMessageReceiver provideSignalServiceMessageReceiver(); @NonNull SignalServiceNetworkAccess provideSignalServiceNetworkAccess(); @NonNull IncomingMessageProcessor provideIncomingMessageProcessor(); @@ -521,5 +530,6 @@ public class ApplicationDependencies { @NonNull SignalCallManager provideSignalCallManager(); @NonNull PendingRetryReceiptManager providePendingRetryReceiptManager(); @NonNull PendingRetryReceiptCache providePendingRetryReceiptCache(); + @NonNull SignalWebSocket provideSignalWebSocket(); } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java index 49764d9ed8..93f3893daf 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java @@ -59,11 +59,14 @@ import org.whispersystems.libsignal.util.guava.Optional; import org.whispersystems.signalservice.api.SignalServiceAccountManager; import org.whispersystems.signalservice.api.SignalServiceMessageReceiver; import org.whispersystems.signalservice.api.SignalServiceMessageSender; +import org.whispersystems.signalservice.api.SignalWebSocket; import org.whispersystems.signalservice.api.groupsv2.ClientZkOperations; import org.whispersystems.signalservice.api.groupsv2.GroupsV2Operations; import org.whispersystems.signalservice.api.util.CredentialsProvider; import org.whispersystems.signalservice.api.util.SleepTimer; import org.whispersystems.signalservice.api.util.UptimeSleepTimer; +import org.whispersystems.signalservice.api.websocket.WebSocketFactory; +import org.whispersystems.signalservice.internal.websocket.WebSocketConnection; import java.util.UUID; @@ -106,15 +109,14 @@ public class ApplicationDependencyProvider implements ApplicationDependencies.Pr } @Override - public @NonNull SignalServiceMessageSender provideSignalServiceMessageSender() { + public @NonNull SignalServiceMessageSender provideSignalServiceMessageSender(@NonNull SignalWebSocket signalWebSocket) { return new SignalServiceMessageSender(provideSignalServiceNetworkAccess().getConfiguration(context), new DynamicCredentialsProvider(context), new SignalProtocolStoreImpl(context), ReentrantSessionLock.INSTANCE, BuildConfig.SIGNAL_AGENT, TextSecurePreferences.isMultiDevice(context), - Optional.fromNullable(IncomingMessageObserver.getPipe()), - Optional.fromNullable(IncomingMessageObserver.getUnidentifiedPipe()), + signalWebSocket, Optional.of(new SecurityEventListener(context)), provideClientZkOperations().getProfileOperations(), SignalExecutors.newCachedBoundedExecutor("signal-messages", 1, 16), @@ -261,6 +263,41 @@ public class ApplicationDependencyProvider implements ApplicationDependencies.Pr return new PendingRetryReceiptCache(context); } + @Override + public @NonNull SignalWebSocket provideSignalWebSocket() { + return new SignalWebSocket(provideWebSocketFactory()); + } + + private @NonNull WebSocketFactory provideWebSocketFactory() { + return new WebSocketFactory() { + @Override + public WebSocketConnection createWebSocket() { + SleepTimer sleepTimer = TextSecurePreferences.isFcmDisabled(context) ? new AlarmSleepTimer(context) + : new UptimeSleepTimer(); + + return new WebSocketConnection("normal", + provideSignalServiceNetworkAccess().getConfiguration(context), + Optional.of(new DynamicCredentialsProvider(context)), + BuildConfig.SIGNAL_AGENT, + pipeListener, + sleepTimer); + } + + @Override + public WebSocketConnection createUnidentifiedWebSocket() { + SleepTimer sleepTimer = TextSecurePreferences.isFcmDisabled(context) ? new AlarmSleepTimer(context) + : new UptimeSleepTimer(); + + return new WebSocketConnection("unidentified", + provideSignalServiceNetworkAccess().getConfiguration(context), + Optional.absent(), + BuildConfig.SIGNAL_AGENT, + pipeListener, + sleepTimer); + } + }; + } + private static class DynamicCredentialsProvider implements CredentialsProvider { private final Context context; diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/RetrieveProfileJob.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/RetrieveProfileJob.java index 82e4d36582..5d4221b06d 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/RetrieveProfileJob.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/RetrieveProfileJob.java @@ -46,9 +46,8 @@ import org.whispersystems.signalservice.api.crypto.InvalidCiphertextException; import org.whispersystems.signalservice.api.crypto.ProfileCipher; import org.whispersystems.signalservice.api.profiles.ProfileAndCredential; import org.whispersystems.signalservice.api.profiles.SignalServiceProfile; -import org.whispersystems.signalservice.api.push.exceptions.NotFoundException; -import org.whispersystems.signalservice.api.push.exceptions.PushNetworkException; -import org.whispersystems.signalservice.internal.util.concurrent.ListenableFuture; +import org.whispersystems.signalservice.api.services.ProfileService; +import org.whispersystems.signalservice.internal.ServiceResponse; import java.io.IOException; import java.util.ArrayList; @@ -59,9 +58,10 @@ import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; + +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.schedulers.Schedulers; /** * Retrieves a users profile and sets the appropriate local fields. @@ -86,7 +86,7 @@ public class RetrieveProfileJob extends BaseJob { /** * Submits the necessary job to refresh the profile of the requested recipient. Works for any * RecipientId, including individuals, groups, or yourself. - * + *

* Identical to {@link #enqueue(Set)})} */ @WorkerThread @@ -169,7 +169,7 @@ public class RetrieveProfileJob extends BaseJob { */ public static void enqueueRoutineFetchIfNecessary(Application application) { if (!SignalStore.registrationValues().isRegistrationComplete() || - !TextSecurePreferences.isPushRegistered(application) || + !TextSecurePreferences.isPushRegistered(application) || TextSecurePreferences.getLocalUuid(application) == null) { Log.i(TAG, "Registration not complete. Skipping."); @@ -204,10 +204,9 @@ public class RetrieveProfileJob extends BaseJob { } private RetrieveProfileJob(@NonNull Set recipientIds) { - this(new Job.Parameters.Builder() - .addConstraint(NetworkConstraint.KEY) - .setMaxAttempts(3) - .build(), + this(new Job.Parameters.Builder().addConstraint(NetworkConstraint.KEY) + .setMaxAttempts(3) + .build(), recipientIds); } @@ -218,11 +217,10 @@ public class RetrieveProfileJob extends BaseJob { @Override public @NonNull Data serialize() { - return new Data.Builder() - .putStringListAsArray(KEY_RECIPIENTS, Stream.of(recipientIds) - .map(RecipientId::serialize) - .toList()) - .build(); + return new Data.Builder().putStringListAsArray(KEY_RECIPIENTS, Stream.of(recipientIds) + .map(RecipientId::serialize) + .toList()) + .build(); } @Override @@ -244,8 +242,6 @@ public class RetrieveProfileJob extends BaseJob { Stopwatch stopwatch = new Stopwatch("RetrieveProfile"); RecipientDatabase recipientDatabase = DatabaseFactory.getRecipientDatabase(context); - Set retries = new HashSet<>(); - Set unregistered = new HashSet<>(); RecipientUtil.ensureUuidsAreAvailable(context, Stream.of(Recipient.resolvedList(recipientIds)) .filter(r -> r.getRegistered() != RecipientDatabase.RegisteredState.NOT_REGISTERED) @@ -254,66 +250,64 @@ public class RetrieveProfileJob extends BaseJob { List recipients = Recipient.resolvedList(recipientIds); stopwatch.split("resolve-ensure"); - List>> futures = Stream.of(recipients) - .filter(Recipient::hasServiceIdentifier) - .map(r -> new Pair<>(r, ProfileUtil.retrieveProfile(context, r, getRequestType(r)))) - .toList(); - stopwatch.split("futures"); + ProfileService profileService = new ProfileService(ApplicationDependencies.getGroupsV2Operations().getProfileOperations(), + ApplicationDependencies.getSignalServiceMessageReceiver(), + ApplicationDependencies.getSignalWebSocket()); - List> profiles = Stream.of(futures) - .map(pair -> { - Recipient recipient = pair.first(); + List>>> requests = Stream.of(recipients) + .filter(Recipient::hasServiceIdentifier) + .map(r -> ProfileUtil.retrieveProfile(context, r, getRequestType(r), profileService).toObservable()) + .toList(); + stopwatch.split("requests"); - try { - ProfileAndCredential profile = pair.second().get(10, TimeUnit.SECONDS); - return new Pair<>(recipient, profile); - } catch (InterruptedException | TimeoutException e) { - retries.add(recipient.getId()); - } catch (ExecutionException e) { - if (e.getCause() instanceof PushNetworkException) { - retries.add(recipient.getId()); - } else if (e.getCause() instanceof NotFoundException) { - Log.w(TAG, "Failed to find a profile for " + recipient.getId()); - if (recipient.isRegistered()) { - unregistered.add(recipient.getId()); - } - } else { - Log.w(TAG, "Failed to retrieve profile for " + recipient.getId()); - } - } - return null; - }) - .withoutNulls() - .toList(); - stopwatch.split("network"); + OperationState operationState = Observable.mergeDelayError(requests) + .observeOn(Schedulers.io(), true) + .scan(new OperationState(), (state, pair) -> { + Recipient recipient = pair.first(); + ProfileService.ProfileResponseProcessor processor = new ProfileService.ProfileResponseProcessor(pair.second()); + if (processor.hasResult()) { + state.profiles.add(processor.getResult(recipient)); + process(recipient, processor.getResult()); + } else if (processor.notFound()) { + Log.w(TAG, "Failed to find a profile for " + recipient.getId()); + if (recipient.isRegistered()) { + state.unregistered.add(recipient.getId()); + } + } else if (processor.genericIoError()) { + state.retries.add(recipient.getId()); + } else { + Log.w(TAG, "Failed to retrieve profile for " + recipient.getId()); + } + return state; + }) + .lastOrError() + .blockingGet(); - for (Pair profile : profiles) { - process(profile.first(), profile.second()); - } + stopwatch.split("network-process"); - Set success = SetUtil.difference(recipientIds, retries); + Set success = SetUtil.difference(recipientIds, operationState.retries); recipientDatabase.markProfilesFetched(success, System.currentTimeMillis()); - Map newlyRegistered = Stream.of(profiles) + Map newlyRegistered = Stream.of(operationState.profiles) .map(Pair::first) .filterNot(Recipient::isRegistered) .collect(Collectors.toMap(Recipient::getId, - r -> r.getUuid().transform(UUID::toString).orNull())); + r -> r.getUuid().transform(UUID::toString).orNull())); - if (unregistered.size() > 0 || newlyRegistered.size() > 0) { - Log.i(TAG, "Marking " + newlyRegistered.size() + " users as registered and " + unregistered.size() + " users as unregistered."); - recipientDatabase.bulkUpdatedRegisteredStatus(newlyRegistered, unregistered); + if (operationState.unregistered.size() > 0 || newlyRegistered.size() > 0) { + Log.i(TAG, "Marking " + newlyRegistered.size() + " users as registered and " + operationState.unregistered.size() + " users as unregistered."); + recipientDatabase.bulkUpdatedRegisteredStatus(newlyRegistered, operationState.unregistered); } stopwatch.split("process"); - long keyCount = Stream.of(profiles).map(Pair::first).map(Recipient::getProfileKey).withoutNulls().count(); - Log.d(TAG, String.format(Locale.US, "Started with %d recipient(s). Found %d profile(s), and had keys for %d of them. Will retry %d.", recipients.size(), profiles.size(), keyCount, retries.size())); + long keyCount = Stream.of(operationState.profiles).map(Pair::first).map(Recipient::getProfileKey).withoutNulls().count(); + Log.d(TAG, String.format(Locale.US, "Started with %d recipient(s). Found %d profile(s), and had keys for %d of them. Will retry %d.", recipients.size(), operationState.profiles.size(), keyCount, operationState.retries.size())); stopwatch.stop(TAG); recipientIds.clear(); - recipientIds.addAll(retries); + recipientIds.addAll(operationState.retries); if (recipientIds.size() > 0) { throw new RetryLaterException(); @@ -329,8 +323,8 @@ public class RetrieveProfileJob extends BaseJob { public void onFailure() {} private void process(Recipient recipient, ProfileAndCredential profileAndCredential) { - SignalServiceProfile profile = profileAndCredential.getProfile(); - ProfileKey recipientProfileKey = ProfileKeyUtil.profileKeyOrNull(recipient.getProfileKey()); + SignalServiceProfile profile = profileAndCredential.getProfile(); + ProfileKey recipientProfileKey = ProfileKeyUtil.profileKeyOrNull(recipient.getProfileKey()); setProfileName(recipient, profile.getName()); setProfileAbout(recipient, profile.getAbout(), profile.getAboutEmoji()); @@ -401,7 +395,7 @@ public class RetrieveProfileJob extends BaseJob { } } else { ProfileCipher profileCipher = new ProfileCipher(profileKey); - boolean verifiedUnidentifiedAccess; + boolean verifiedUnidentifiedAccess; try { verifiedUnidentifiedAccess = profileCipher.verifyUnidentifiedAccess(Base64.decode(unidentifiedAccessVerifier)); @@ -436,9 +430,9 @@ public class RetrieveProfileJob extends BaseJob { String remoteDisplayName = remoteProfileName.toString(); String localDisplayName = localProfileName.toString(); - if (!recipient.isBlocked() && - !recipient.isGroup() && - !recipient.isSelf() && + if (!recipient.isBlocked() && + !recipient.isGroup() && + !recipient.isSelf() && !localDisplayName.isEmpty() && !remoteDisplayName.equals(localDisplayName)) { @@ -446,7 +440,7 @@ public class RetrieveProfileJob extends BaseJob { DatabaseFactory.getSmsDatabase(context).insertProfileNameChangeMessages(recipient, remoteDisplayName, localDisplayName); } else { Log.i(TAG, String.format(Locale.US, "Name changed, but wasn't relevant to write an event. blocked: %s, group: %s, self: %s, firstSet: %s, displayChange: %s", - recipient.isBlocked(), recipient.isGroup(), recipient.isSelf(), localDisplayName.isEmpty(), !remoteDisplayName.equals(localDisplayName))); + recipient.isBlocked(), recipient.isGroup(), recipient.isSelf(), localDisplayName.isEmpty(), !remoteDisplayName.equals(localDisplayName))); } } @@ -494,6 +488,15 @@ public class RetrieveProfileJob extends BaseJob { DatabaseFactory.getRecipientDatabase(context).setCapabilities(recipient.getId(), capabilities); } + /** + * Collective state as responses are processed as they come in. + */ + private static class OperationState { + final Set retries = new HashSet<>(); + final Set unregistered = new HashSet<>(); + final List> profiles = new ArrayList<>(); + } + public static final class Factory implements Job.Factory { @Override diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.java b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.java index cec20c3b39..acf65d4f88 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.java +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.java @@ -26,11 +26,10 @@ import org.thoughtcrime.securesms.notifications.NotificationChannels; import org.thoughtcrime.securesms.push.SignalServiceNetworkAccess; import org.thoughtcrime.securesms.util.AppForegroundObserver; import org.thoughtcrime.securesms.util.TextSecurePreferences; -import org.whispersystems.libsignal.InvalidVersionException; import org.whispersystems.libsignal.util.guava.Optional; -import org.whispersystems.signalservice.api.SignalServiceMessagePipe; -import org.whispersystems.signalservice.api.SignalServiceMessageReceiver; +import org.whispersystems.signalservice.api.SignalWebSocket; import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope; +import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException; import java.util.ArrayList; import java.util.List; @@ -48,9 +47,6 @@ public class IncomingMessageObserver { private static final AtomicInteger INSTANCE_COUNT = new AtomicInteger(0); - private static volatile SignalServiceMessagePipe pipe = null; - private static volatile SignalServiceMessagePipe unidentifiedPipe = null; - private final Application context; private final SignalServiceNetworkAccess networkAccess; private final List decryptionDrainedListeners; @@ -96,7 +92,7 @@ public class IncomingMessageObserver { Log.w(TAG, "Lost network connection. Shutting down our websocket connections and resetting the drained state."); networkDrained = false; decryptionDrained = false; - shutdown(pipe, unidentifiedPipe); + shutdown(); } IncomingMessageObserver.this.notifyAll(); } @@ -176,40 +172,12 @@ public class IncomingMessageObserver { SignalExecutors.BOUNDED.execute(() -> { Log.w(TAG, "Beginning termination."); terminated = true; - shutdown(pipe, unidentifiedPipe); + shutdown(); }); } - private void shutdown(@Nullable SignalServiceMessagePipe pipe, @Nullable SignalServiceMessagePipe unidentifiedPipe) { - try { - if (pipe != null) { - Log.w(TAG, "Shutting down normal pipe."); - pipe.shutdown(); - } else { - Log.w(TAG, "No need to shutdown normal pipe, it doesn't exist."); - } - } catch (Throwable t) { - Log.w(TAG, "Closing normal pipe failed!", t); - } - - try { - if (unidentifiedPipe != null) { - Log.w(TAG, "Shutting down unidentified pipe."); - unidentifiedPipe.shutdown(); - } else { - Log.w(TAG, "No need to shutdown unidentified pipe, it doesn't exist."); - } - } catch (Throwable t) { - Log.w(TAG, "Closing unidentified pipe failed!", t); - } - } - - public static @Nullable SignalServiceMessagePipe getPipe() { - return pipe; - } - - public static @Nullable SignalServiceMessagePipe getUnidentifiedPipe() { - return unidentifiedPipe; + private void shutdown() { + ApplicationDependencies.getSignalWebSocket().disconnect(); } private class MessageRetrievalThread extends Thread implements Thread.UncaughtExceptionHandler { @@ -227,19 +195,14 @@ public class IncomingMessageObserver { waitForConnectionNecessary(); Log.i(TAG, "Making websocket connection...."); - SignalServiceMessageReceiver receiver = ApplicationDependencies.getSignalServiceMessageReceiver(); - - pipe = receiver.createMessagePipe(); - unidentifiedPipe = receiver.createUnidentifiedMessagePipe(); - - SignalServiceMessagePipe localPipe = pipe; - SignalServiceMessagePipe unidentifiedLocalPipe = unidentifiedPipe; + SignalWebSocket signalWebSocket = ApplicationDependencies.getSignalWebSocket(); + signalWebSocket.connect(); try { while (isConnectionNecessary()) { try { Log.d(TAG, "Reading message..."); - Optional result = localPipe.readOrEmpty(REQUEST_TIMEOUT_MINUTES, TimeUnit.MINUTES, envelope -> { + Optional result = signalWebSocket.readOrEmpty(TimeUnit.MINUTES.toMillis(REQUEST_TIMEOUT_MINUTES), envelope -> { Log.i(TAG, "Retrieved envelope! " + envelope.getTimestamp()); try (Processor processor = ApplicationDependencies.getIncomingMessageProcessor().acquire()) { processor.processEnvelope(envelope); @@ -251,6 +214,9 @@ public class IncomingMessageObserver { networkDrained = true; ApplicationDependencies.getJobManager().add(new PushDecryptDrainedJob()); } + } catch (WebSocketUnavailableException e) { + Log.i(TAG, "Pipe unexpectedly unavailable, connecting"); + signalWebSocket.connect(); } catch (TimeoutException e) { Log.w(TAG, "Application level read timeout..."); } @@ -259,7 +225,7 @@ public class IncomingMessageObserver { Log.w(TAG, e); } finally { Log.w(TAG, "Shutting down pipe..."); - shutdown(localPipe, unidentifiedLocalPipe); + shutdown(); } Log.i(TAG, "Looping..."); diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/WebsocketStrategy.java b/app/src/main/java/org/thoughtcrime/securesms/messages/WebsocketStrategy.java index d4df57a4bf..78e37007b4 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/messages/WebsocketStrategy.java +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/WebsocketStrategy.java @@ -6,28 +6,25 @@ import org.signal.core.util.logging.Log; import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; import org.thoughtcrime.securesms.jobmanager.JobManager; import org.thoughtcrime.securesms.jobs.PushProcessMessageJob; -import org.whispersystems.libsignal.InvalidVersionException; import org.whispersystems.libsignal.util.guava.Optional; -import org.whispersystems.signalservice.api.SignalServiceMessagePipe; -import org.whispersystems.signalservice.api.SignalServiceMessageReceiver; +import org.whispersystems.signalservice.api.SignalWebSocket; import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope; import java.io.IOException; import java.util.Iterator; import java.util.Set; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; class WebsocketStrategy extends MessageRetrievalStrategy { private static final String TAG = Log.tag(WebsocketStrategy.class); - private final SignalServiceMessageReceiver receiver; - private final JobManager jobManager; + private final SignalWebSocket signalWebSocket; + private final JobManager jobManager; public WebsocketStrategy() { - this.receiver = ApplicationDependencies.getSignalServiceMessageReceiver(); - this.jobManager = ApplicationDependencies.getJobManager(); + this.signalWebSocket = ApplicationDependencies.getSignalWebSocket(); + this.jobManager = ApplicationDependencies.getJobManager(); } @Override @@ -55,15 +52,15 @@ class WebsocketStrategy extends MessageRetrievalStrategy { } private @NonNull Set drainWebsocket(long timeout, long startTime) throws IOException { - SignalServiceMessagePipe pipe = receiver.createMessagePipe(); QueueFindingJobListener queueListener = new QueueFindingJobListener(); jobManager.addListener(job -> job.getParameters().getQueue() != null && job.getParameters().getQueue().startsWith(PushProcessMessageJob.QUEUE_PREFIX), queueListener); try { + signalWebSocket.connect(); while (shouldContinue()) { try { - Optional result = pipe.readOrEmpty(timeout, TimeUnit.MILLISECONDS, envelope -> { + Optional result = signalWebSocket.readOrEmpty(timeout, envelope -> { Log.i(TAG, "Retrieved envelope! " + envelope.getTimestamp() + timeSuffix(startTime)); try (IncomingMessageProcessor.Processor processor = ApplicationDependencies.getIncomingMessageProcessor().acquire()) { processor.processEnvelope(envelope); @@ -79,7 +76,7 @@ class WebsocketStrategy extends MessageRetrievalStrategy { } } } finally { - pipe.shutdown(); + signalWebSocket.disconnect(); jobManager.removeListener(queueListener); } diff --git a/app/src/main/java/org/thoughtcrime/securesms/payments/confirm/ConfirmPaymentRepository.java b/app/src/main/java/org/thoughtcrime/securesms/payments/confirm/ConfirmPaymentRepository.java index a3f0e23eec..9fbbd4851e 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/payments/confirm/ConfirmPaymentRepository.java +++ b/app/src/main/java/org/thoughtcrime/securesms/payments/confirm/ConfirmPaymentRepository.java @@ -22,7 +22,6 @@ import org.whispersystems.signalservice.api.payments.Money; import java.io.IOException; import java.util.UUID; -import java.util.concurrent.ExecutionException; final class ConfirmPaymentRepository { @@ -54,7 +53,7 @@ final class ConfirmPaymentRepository { recipientId = payee.requireRecipientId(); try { mobileCoinPublicAddress = ProfileUtil.getAddressForRecipient(Recipient.resolved(recipientId)); - } catch (InterruptedException | ExecutionException e) { + } catch (IOException e) { Log.w(TAG, "Failed to get address for recipient " + recipientId); consumer.accept(new ConfirmPaymentResult.Error()); return; diff --git a/app/src/main/java/org/thoughtcrime/securesms/util/ProfileUtil.java b/app/src/main/java/org/thoughtcrime/securesms/util/ProfileUtil.java index 2c7376f71b..a23c884c0f 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/util/ProfileUtil.java +++ b/app/src/main/java/org/thoughtcrime/securesms/util/ProfileUtil.java @@ -16,7 +16,6 @@ import org.thoughtcrime.securesms.database.DatabaseFactory; import org.thoughtcrime.securesms.database.RecipientDatabase; import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; import org.thoughtcrime.securesms.keyvalue.SignalStore; -import org.thoughtcrime.securesms.messages.IncomingMessageObserver; import org.thoughtcrime.securesms.payments.MobileCoinPublicAddress; import org.thoughtcrime.securesms.payments.MobileCoinPublicAddressProfileUtil; import org.thoughtcrime.securesms.payments.PaymentsAddressException; @@ -27,10 +26,9 @@ import org.thoughtcrime.securesms.recipients.RecipientUtil; import org.whispersystems.libsignal.IdentityKey; import org.whispersystems.libsignal.IdentityKeyPair; import org.whispersystems.libsignal.InvalidKeyException; +import org.whispersystems.libsignal.util.Pair; import org.whispersystems.libsignal.util.guava.Optional; import org.whispersystems.signalservice.api.SignalServiceAccountManager; -import org.whispersystems.signalservice.api.SignalServiceMessagePipe; -import org.whispersystems.signalservice.api.SignalServiceMessageReceiver; import org.whispersystems.signalservice.api.crypto.InvalidCiphertextException; import org.whispersystems.signalservice.api.crypto.ProfileCipher; import org.whispersystems.signalservice.api.crypto.UnidentifiedAccess; @@ -38,18 +36,14 @@ import org.whispersystems.signalservice.api.crypto.UnidentifiedAccessPair; import org.whispersystems.signalservice.api.profiles.ProfileAndCredential; import org.whispersystems.signalservice.api.profiles.SignalServiceProfile; import org.whispersystems.signalservice.api.push.SignalServiceAddress; -import org.whispersystems.signalservice.api.push.exceptions.NotFoundException; -import org.whispersystems.signalservice.api.push.exceptions.PushNetworkException; +import org.whispersystems.signalservice.api.services.ProfileService; import org.whispersystems.signalservice.api.util.StreamDetails; +import org.whispersystems.signalservice.internal.ServiceResponse; import org.whispersystems.signalservice.internal.push.SignalServiceProtos; -import org.whispersystems.signalservice.internal.util.concurrent.CascadingFuture; -import org.whispersystems.signalservice.internal.util.concurrent.ListenableFuture; import java.io.IOException; -import java.util.Arrays; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; + +import io.reactivex.rxjava3.core.Single; /** * Aids in the retrieval and decryption of profiles. @@ -67,40 +61,26 @@ public final class ProfileUtil { @NonNull SignalServiceProfile.RequestType requestType) throws IOException { - try { - return retrieveProfile(context, recipient, requestType).get(10, TimeUnit.SECONDS); - } catch (ExecutionException e) { - if (e.getCause() instanceof PushNetworkException) { - throw (PushNetworkException) e.getCause(); - } else if (e.getCause() instanceof NotFoundException) { - throw (NotFoundException) e.getCause(); - } else { - throw new IOException(e); - } - } catch (InterruptedException | TimeoutException e) { - throw new PushNetworkException(e); - } + ProfileService profileService = new ProfileService(ApplicationDependencies.getGroupsV2Operations().getProfileOperations(), + ApplicationDependencies.getSignalServiceMessageReceiver(), + ApplicationDependencies.getSignalWebSocket()); + + Pair> response = retrieveProfile(context, recipient, requestType, profileService).blockingGet(); + return new ProfileService.ProfileResponseProcessor(response.second()).getResultOrThrow(); } - public static @NonNull ListenableFuture retrieveProfile(@NonNull Context context, - @NonNull Recipient recipient, - @NonNull SignalServiceProfile.RequestType requestType) + public static Single>> retrieveProfile(@NonNull Context context, + @NonNull Recipient recipient, + @NonNull SignalServiceProfile.RequestType requestType, + @NonNull ProfileService profileService) { SignalServiceAddress address = toSignalServiceAddress(context, recipient); Optional unidentifiedAccess = getUnidentifiedAccess(context, recipient); Optional profileKey = ProfileKeyUtil.profileKeyOptional(recipient.getProfileKey()); - if (unidentifiedAccess.isPresent()) { - return new CascadingFuture<>(Arrays.asList(() -> getPipeRetrievalFuture(address, profileKey, unidentifiedAccess, requestType), - () -> getSocketRetrievalFuture(address, profileKey, unidentifiedAccess, requestType), - () -> getPipeRetrievalFuture(address, profileKey, Optional.absent(), requestType), - () -> getSocketRetrievalFuture(address, profileKey, Optional.absent(), requestType)), - e -> !(e instanceof NotFoundException)); - } else { - return new CascadingFuture<>(Arrays.asList(() -> getPipeRetrievalFuture(address, profileKey, Optional.absent(), requestType), - () -> getSocketRetrievalFuture(address, profileKey, Optional.absent(), requestType)), - e -> !(e instanceof NotFoundException)); - } + return profileService.getProfile(address, profileKey, unidentifiedAccess, requestType) + .map(p -> new Pair<>(recipient, p)) + .onErrorReturn(t -> new Pair<>(recipient, ServiceResponse.forUnknownError(t))); } public static @Nullable String decryptString(@NonNull ProfileKey profileKey, @Nullable byte[] encryptedString) @@ -126,7 +106,7 @@ public final class ProfileUtil { @WorkerThread public static @NonNull MobileCoinPublicAddress getAddressForRecipient(@NonNull Recipient recipient) - throws InterruptedException, ExecutionException, PaymentsAddressException + throws IOException, PaymentsAddressException { ProfileKey profileKey; try { @@ -135,7 +115,7 @@ public final class ProfileUtil { Log.w(TAG, "Profile key not available for " + recipient.getId()); throw new PaymentsAddressException(PaymentsAddressException.Code.NO_PROFILE_KEY); } - ProfileAndCredential profileAndCredential = ProfileUtil.retrieveProfile(ApplicationDependencies.getApplication(), recipient, SignalServiceProfile.RequestType.PROFILE).get(); + ProfileAndCredential profileAndCredential = ProfileUtil.retrieveProfileSync(ApplicationDependencies.getApplication(), recipient, SignalServiceProfile.RequestType.PROFILE); SignalServiceProfile profile = profileAndCredential.getProfile(); byte[] encryptedPaymentsAddress = profile.getPaymentAddress(); @@ -277,32 +257,6 @@ public final class ProfileUtil { } } - private static @NonNull ListenableFuture getPipeRetrievalFuture(@NonNull SignalServiceAddress address, - @NonNull Optional profileKey, - @NonNull Optional unidentifiedAccess, - @NonNull SignalServiceProfile.RequestType requestType) - throws IOException - { - SignalServiceMessagePipe authPipe = IncomingMessageObserver.getPipe(); - SignalServiceMessagePipe unidentifiedPipe = IncomingMessageObserver.getUnidentifiedPipe(); - SignalServiceMessagePipe pipe = unidentifiedPipe != null && unidentifiedAccess.isPresent() ? unidentifiedPipe - : authPipe; - if (pipe != null) { - return pipe.getProfile(address, profileKey, unidentifiedAccess, requestType); - } - - throw new IOException("No pipe available!"); - } - - private static @NonNull ListenableFuture getSocketRetrievalFuture(@NonNull SignalServiceAddress address, - @NonNull Optional profileKey, - @NonNull Optional unidentifiedAccess, - @NonNull SignalServiceProfile.RequestType requestType) - { - SignalServiceMessageReceiver receiver = ApplicationDependencies.getSignalServiceMessageReceiver(); - return receiver.retrieveProfile(address, profileKey, unidentifiedAccess, requestType); - } - private static Optional getUnidentifiedAccess(@NonNull Context context, @NonNull Recipient recipient) { Optional unidentifiedAccess = UnidentifiedAccessUtil.getAccessFor(context, recipient, false); diff --git a/core-util/src/main/java/org/signal/core/util/concurrent/SignalExecutors.java b/core-util/src/main/java/org/signal/core/util/concurrent/SignalExecutors.java index bf34dbfbca..760b77f778 100644 --- a/core-util/src/main/java/org/signal/core/util/concurrent/SignalExecutors.java +++ b/core-util/src/main/java/org/signal/core/util/concurrent/SignalExecutors.java @@ -16,9 +16,10 @@ import java.util.concurrent.atomic.AtomicInteger; public final class SignalExecutors { - public static final ExecutorService UNBOUNDED = Executors.newCachedThreadPool(new NumberedThreadFactory("signal-unbounded")); - public static final ExecutorService BOUNDED = Executors.newFixedThreadPool(getIdealThreadCount(), new NumberedThreadFactory("signal-bounded")); - public static final ExecutorService SERIAL = Executors.newSingleThreadExecutor(new NumberedThreadFactory("signal-serial")); + public static final ExecutorService UNBOUNDED = Executors.newCachedThreadPool(new NumberedThreadFactory("signal-unbounded")); + public static final ExecutorService BOUNDED = Executors.newFixedThreadPool(getIdealThreadCount(), new NumberedThreadFactory("signal-bounded")); + public static final ExecutorService SERIAL = Executors.newSingleThreadExecutor(new NumberedThreadFactory("signal-serial")); + public static final ExecutorService BOUNDED_IO = newCachedBoundedExecutor("signal-bounded-io", 1, 32); private SignalExecutors() {} @@ -32,10 +33,10 @@ public final class SignalExecutors { * ThreadPoolExecutor will only create a new thread if the provided queue returns false from * offer(). That means if you give it an unbounded queue, it'll only ever create 1 thread, no * matter how long the queue gets. - * + *

* But if you bound the queue and submit more runnables than there are threads, your task is * rejected and throws an exception. - * + *

* So we make a queue that will always return false if it's non-empty to ensure new threads get * created. Then, if a task gets rejected, we simply add it to the queue. */ diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessagePipe.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessagePipe.java deleted file mode 100644 index 2350e3befb..0000000000 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessagePipe.java +++ /dev/null @@ -1,409 +0,0 @@ -/* - * Copyright (C) 2014-2016 Open Whisper Systems - * - * Licensed according to the LICENSE file in this repository. - */ - -package org.whispersystems.signalservice.api; - -import com.google.protobuf.ByteString; - -import org.signal.zkgroup.profiles.ClientZkProfileOperations; -import org.signal.zkgroup.profiles.ProfileKey; -import org.signal.zkgroup.profiles.ProfileKeyCredential; -import org.signal.zkgroup.profiles.ProfileKeyCredentialRequest; -import org.signal.zkgroup.profiles.ProfileKeyCredentialRequestContext; -import org.signal.zkgroup.profiles.ProfileKeyVersion; -import org.whispersystems.libsignal.InvalidVersionException; -import org.whispersystems.libsignal.logging.Log; -import org.whispersystems.libsignal.util.guava.Optional; -import org.whispersystems.signalservice.api.crypto.UnidentifiedAccess; -import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope; -import org.whispersystems.signalservice.api.profiles.ProfileAndCredential; -import org.whispersystems.signalservice.api.profiles.SignalServiceProfile; -import org.whispersystems.signalservice.api.push.SignalServiceAddress; -import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulResponseCodeException; -import org.whispersystems.signalservice.api.push.exceptions.NotFoundException; -import org.whispersystems.signalservice.api.push.exceptions.ProofRequiredException; -import org.whispersystems.signalservice.api.push.exceptions.ServerRejectedException; -import org.whispersystems.signalservice.api.push.exceptions.UnregisteredUserException; -import org.whispersystems.signalservice.api.util.CredentialsProvider; -import org.whispersystems.signalservice.internal.push.AttachmentV2UploadAttributes; -import org.whispersystems.signalservice.internal.push.AttachmentV3UploadAttributes; -import org.whispersystems.signalservice.internal.push.SendGroupMessageResponse; -import org.whispersystems.signalservice.internal.push.OutgoingPushMessageList; -import org.whispersystems.signalservice.internal.push.ProofRequiredResponse; -import org.whispersystems.signalservice.internal.push.SendMessageResponse; -import org.whispersystems.signalservice.internal.util.Hex; -import org.whispersystems.signalservice.internal.util.JsonUtil; -import org.whispersystems.signalservice.internal.util.Util; -import org.whispersystems.signalservice.internal.util.concurrent.FutureTransformers; -import org.whispersystems.signalservice.internal.util.concurrent.ListenableFuture; -import org.whispersystems.signalservice.internal.websocket.WebSocketConnection; -import org.whispersystems.signalservice.internal.websocket.WebsocketResponse; -import org.whispersystems.util.Base64; - -import java.io.IOException; -import java.security.SecureRandom; -import java.util.LinkedList; -import java.util.List; -import java.util.Locale; -import java.util.UUID; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static org.whispersystems.signalservice.internal.websocket.WebSocketProtos.WebSocketRequestMessage; -import static org.whispersystems.signalservice.internal.websocket.WebSocketProtos.WebSocketResponseMessage; - -/** - * A SignalServiceMessagePipe represents a dedicated connection - * to the Signal Service, which the server can push messages - * down through. - */ -public class SignalServiceMessagePipe { - - private static final String TAG = SignalServiceMessagePipe.class.getName(); - - private static final String SERVER_DELIVERED_TIMESTAMP_HEADER = "X-Signal-Timestamp"; - - private final WebSocketConnection websocket; - private final Optional credentialsProvider; - private final ClientZkProfileOperations clientZkProfile; - - SignalServiceMessagePipe(WebSocketConnection websocket, - Optional credentialsProvider, - ClientZkProfileOperations clientZkProfile) - { - this.websocket = websocket; - this.credentialsProvider = credentialsProvider; - this.clientZkProfile = clientZkProfile; - - this.websocket.connect(); - } - - /** - * A blocking call that reads a message off the pipe. When this - * call returns, the message has been acknowledged and will not - * be retransmitted. - * - * @param timeout The timeout to wait for. - * @param unit The timeout time unit. - * @return A new message. - * - * @throws InvalidVersionException - * @throws IOException - * @throws TimeoutException - */ - public SignalServiceEnvelope read(long timeout, TimeUnit unit) - throws InvalidVersionException, IOException, TimeoutException - { - return read(timeout, unit, new NullMessagePipeCallback()); - } - - /** - * A blocking call that reads a message off the pipe (see {@link #read(long, java.util.concurrent.TimeUnit)} - * - * Unlike {@link #read(long, java.util.concurrent.TimeUnit)}, this method allows you - * to specify a callback that will be called before the received message is acknowledged. - * This allows you to write the received message to durable storage before acknowledging - * receipt of it to the server. - * - * @param timeout The timeout to wait for. - * @param unit The timeout time unit. - * @param callback A callback that will be called before the message receipt is - * acknowledged to the server. - * @return The message read (same as the message sent through the callback). - * @throws TimeoutException - * @throws IOException - * @throws InvalidVersionException - */ - public SignalServiceEnvelope read(long timeout, TimeUnit unit, MessagePipeCallback callback) - throws TimeoutException, IOException, InvalidVersionException - { - while (true) { - Optional envelope = readOrEmpty(timeout, unit, callback); - - if (envelope.isPresent()) { - return envelope.get(); - } - } - } - - /** - * Similar to {@link #read(long, TimeUnit, MessagePipeCallback)}, except this will return - * {@link Optional#absent()} when an empty response is hit, which indicates the websocket is - * empty. - * - * Important: The empty response will only be hit once for each connection. That means if you get - * an empty response and call readOrEmpty() again on the same instance, you will not get an empty - * response, and instead will block until you get an actual message. This will, however, reset if - * connection breaks (if, for instance, you lose and regain network). - */ - public Optional readOrEmpty(long timeout, TimeUnit unit, MessagePipeCallback callback) - throws TimeoutException, IOException - { - if (!credentialsProvider.isPresent()) { - throw new IllegalArgumentException("You can't read messages if you haven't specified credentials"); - } - - while (true) { - WebSocketRequestMessage request = websocket.readRequest(unit.toMillis(timeout)); - WebSocketResponseMessage response = createWebSocketResponse(request); - try { - if (isSignalServiceEnvelope(request)) { - Optional timestampHeader = findHeader(request, SERVER_DELIVERED_TIMESTAMP_HEADER); - long timestamp = 0; - - if (timestampHeader.isPresent()) { - try { - timestamp = Long.parseLong(timestampHeader.get()); - } catch (NumberFormatException e) { - Log.w(TAG, "Failed to parse " + SERVER_DELIVERED_TIMESTAMP_HEADER); - } - } - - SignalServiceEnvelope envelope = new SignalServiceEnvelope(request.getBody().toByteArray(), timestamp); - - callback.onMessage(envelope); - return Optional.of(envelope); - } else if (isSocketEmptyRequest(request)) { - return Optional.absent(); - } - } finally { - websocket.sendResponse(response); - } - } - } - - public Future sendToGroup(byte[] body, byte[] joinedUnidentifiedAccess, long timestamp, boolean online) throws IOException { - List headers = new LinkedList() {{ - add("content-type:application/vnd.signal-messenger.mrm"); - add("Unidentified-Access-Key:" + Base64.encodeBytes(joinedUnidentifiedAccess)); - }}; - - String path = String.format(Locale.US, "/v1/messages/multi_recipient?ts=%s&online=%s", timestamp, online); - - WebSocketRequestMessage requestMessage = WebSocketRequestMessage.newBuilder() - .setId(new SecureRandom().nextLong()) - .setVerb("PUT") - .setPath(path) - .addAllHeaders(headers) - .setBody(ByteString.copyFrom(body)) - .build(); - - ListenableFuture response = websocket.sendRequest(requestMessage); - - return FutureTransformers.map(response, value -> { - if (value.getStatus() == 200) { - return JsonUtil.fromJson(value.getBody(), SendGroupMessageResponse.class); - } else { - throw new NonSuccessfulResponseCodeException(value.getStatus()); - } - }); - } - - public Future send(OutgoingPushMessageList list, Optional unidentifiedAccess) throws IOException { - List headers = new LinkedList() {{ - add("content-type:application/json"); - }}; - - if (unidentifiedAccess.isPresent()) { - headers.add("Unidentified-Access-Key:" + Base64.encodeBytes(unidentifiedAccess.get().getUnidentifiedAccessKey())); - } - - WebSocketRequestMessage requestMessage = WebSocketRequestMessage.newBuilder() - .setId(new SecureRandom().nextLong()) - .setVerb("PUT") - .setPath(String.format("/v1/messages/%s", list.getDestination())) - .addAllHeaders(headers) - .setBody(ByteString.copyFrom(JsonUtil.toJson(list).getBytes())) - .build(); - - ListenableFuture response = websocket.sendRequest(requestMessage); - - return FutureTransformers.map(response, value -> { - if (value.getStatus() == 404) { - throw new UnregisteredUserException(list.getDestination(), new NotFoundException("not found")); - } else if (value.getStatus() == 428) { - ProofRequiredResponse proofResponse = JsonUtil.fromJson(value.getBody(), ProofRequiredResponse.class); - String retryAfterRaw = value.getHeader("Retry-After"); - long retryAfter = Util.parseInt(retryAfterRaw, -1); - - throw new ProofRequiredException(proofResponse, retryAfter); - } else if (value.getStatus() == 508) { - throw new ServerRejectedException(); - } else if (value.getStatus() < 200 || value.getStatus() >= 300) { - throw new IOException("Non-successful response: " + value.getStatus()); - } - - if (Util.isEmpty(value.getBody())) { - return new SendMessageResponse(false); - } else { - return JsonUtil.fromJson(value.getBody(), SendMessageResponse.class); - } - }); - } - - public ListenableFuture getProfile(SignalServiceAddress address, - Optional profileKey, - Optional unidentifiedAccess, - SignalServiceProfile.RequestType requestType) - throws IOException - { - List headers = new LinkedList<>(); - - if (unidentifiedAccess.isPresent()) { - headers.add("Unidentified-Access-Key:" + Base64.encodeBytes(unidentifiedAccess.get().getUnidentifiedAccessKey())); - } - - Optional uuid = address.getUuid(); - SecureRandom random = new SecureRandom(); - ProfileKeyCredentialRequestContext requestContext = null; - - WebSocketRequestMessage.Builder builder = WebSocketRequestMessage.newBuilder() - .setId(random.nextLong()) - .setVerb("GET") - .addAllHeaders(headers); - - if (uuid.isPresent() && profileKey.isPresent()) { - UUID target = uuid.get(); - ProfileKeyVersion profileKeyIdentifier = profileKey.get().getProfileKeyVersion(target); - String version = profileKeyIdentifier.serialize(); - - if (requestType == SignalServiceProfile.RequestType.PROFILE_AND_CREDENTIAL) { - requestContext = clientZkProfile.createProfileKeyCredentialRequestContext(random, target, profileKey.get()); - - ProfileKeyCredentialRequest request = requestContext.getRequest(); - String credentialRequest = Hex.toStringCondensed(request.serialize()); - - builder.setPath(String.format("/v1/profile/%s/%s/%s", target, version, credentialRequest)); - } else { - builder.setPath(String.format("/v1/profile/%s/%s", target, version)); - } - } else { - builder.setPath(String.format("/v1/profile/%s", address.getIdentifier())); - } - - final ProfileKeyCredentialRequestContext finalRequestContext = requestContext; - WebSocketRequestMessage requestMessage = builder.build(); - - return FutureTransformers.map(websocket.sendRequest(requestMessage), response -> { - if (response.getStatus() == 404) { - throw new NotFoundException("Not found"); - } else if (response.getStatus() < 200 || response.getStatus() >= 300) { - throw new NonSuccessfulResponseCodeException(response.getStatus(), "Non-successful response: " + response.getStatus()); - } - - SignalServiceProfile signalServiceProfile = JsonUtil.fromJson(response.getBody(), SignalServiceProfile.class); - ProfileKeyCredential profileKeyCredential = finalRequestContext != null && signalServiceProfile.getProfileKeyCredentialResponse() != null - ? clientZkProfile.receiveProfileKeyCredential(finalRequestContext, signalServiceProfile.getProfileKeyCredentialResponse()) - : null; - - return new ProfileAndCredential(signalServiceProfile, requestType, Optional.fromNullable(profileKeyCredential)); - }); - } - - public AttachmentV2UploadAttributes getAttachmentV2UploadAttributes() throws IOException { - try { - WebSocketRequestMessage requestMessage = WebSocketRequestMessage.newBuilder() - .setId(new SecureRandom().nextLong()) - .setVerb("GET") - .setPath("/v2/attachments/form/upload") - .build(); - - WebsocketResponse response = websocket.sendRequest(requestMessage).get(10, TimeUnit.SECONDS); - - if (response.getStatus() < 200 || response.getStatus() >= 300) { - throw new IOException("Non-successful response: " + response.getStatus()); - } - - return JsonUtil.fromJson(response.getBody(), AttachmentV2UploadAttributes.class); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - throw new IOException(e); - } - } - - public AttachmentV3UploadAttributes getAttachmentV3UploadAttributes() throws IOException { - try { - WebSocketRequestMessage requestMessage = WebSocketRequestMessage.newBuilder() - .setId(new SecureRandom().nextLong()) - .setVerb("GET") - .setPath("/v3/attachments/form/upload") - .build(); - - WebsocketResponse response = websocket.sendRequest(requestMessage).get(10, TimeUnit.SECONDS); - - if (response.getStatus() < 200 || response.getStatus() >= 300) { - throw new IOException("Non-successful response: " + response.getStatus()); - } - - return JsonUtil.fromJson(response.getBody(), AttachmentV3UploadAttributes.class); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - throw new IOException(e); - } - } - - /** - * Close this connection to the server. - */ - public void shutdown() { - websocket.disconnect(); - } - - private boolean isSignalServiceEnvelope(WebSocketRequestMessage message) { - return "PUT".equals(message.getVerb()) && "/api/v1/message".equals(message.getPath()); - } - - private boolean isSocketEmptyRequest(WebSocketRequestMessage message) { - return "PUT".equals(message.getVerb()) && "/api/v1/queue/empty".equals(message.getPath()); - } - - private WebSocketResponseMessage createWebSocketResponse(WebSocketRequestMessage request) { - if (isSignalServiceEnvelope(request)) { - return WebSocketResponseMessage.newBuilder() - .setId(request.getId()) - .setStatus(200) - .setMessage("OK") - .build(); - } else { - return WebSocketResponseMessage.newBuilder() - .setId(request.getId()) - .setStatus(400) - .setMessage("Unknown") - .build(); - } - } - - private static Optional findHeader(WebSocketRequestMessage message, String targetHeader) { - if (message.getHeadersCount() == 0) { - return Optional.absent(); - } - - for (String header : message.getHeadersList()) { - if (header.startsWith(targetHeader)) { - String[] split = header.split(":"); - if (split.length == 2 && split[0].trim().toLowerCase().equals(targetHeader.toLowerCase())) { - return Optional.of(split[1].trim()); - } - } - } - - return Optional.absent(); - } - - /** - * For receiving a callback when a new message has been - * received. - */ - public interface MessagePipeCallback { - void onMessage(SignalServiceEnvelope envelope); - } - - private static class NullMessagePipeCallback implements MessagePipeCallback { - @Override - public void onMessage(SignalServiceEnvelope envelope) {} - } - -} diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageReceiver.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageReceiver.java index 412e7fb2eb..5cc9b06509 100644 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageReceiver.java +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageReceiver.java @@ -31,11 +31,9 @@ import org.whispersystems.signalservice.internal.push.PushServiceSocket; import org.whispersystems.signalservice.internal.push.SignalServiceEnvelopeEntity; import org.whispersystems.signalservice.internal.push.SignalServiceMessagesResult; import org.whispersystems.signalservice.internal.sticker.StickerProtos; -import org.whispersystems.signalservice.internal.util.StaticCredentialsProvider; import org.whispersystems.signalservice.internal.util.Util; import org.whispersystems.signalservice.internal.util.concurrent.FutureTransformers; import org.whispersystems.signalservice.internal.util.concurrent.ListenableFuture; -import org.whispersystems.signalservice.internal.websocket.WebSocketConnection; import java.io.ByteArrayOutputStream; import java.io.File; @@ -63,28 +61,6 @@ public class SignalServiceMessageReceiver { private final SleepTimer sleepTimer; private final ClientZkProfileOperations clientZkProfileOperations; - /** - * Construct a SignalServiceMessageReceiver. - * - * @param urls The URL of the Signal Service. - * @param uuid The Signal Service UUID. - * @param e164 The Signal Service phone number. - * @param password The Signal Service user password. - * @param signalingKey The 52 byte signaling key assigned to this user at registration. - */ - public SignalServiceMessageReceiver(SignalServiceConfiguration urls, - UUID uuid, - String e164, - String password, - String signalAgent, - ConnectivityListener listener, - SleepTimer timer, - ClientZkProfileOperations clientZkProfileOperations, - boolean automaticNetworkRetry) - { - this(urls, new StaticCredentialsProvider(uuid, e164, password), signalAgent, listener, timer, clientZkProfileOperations, automaticNetworkRetry); - } - /** * Construct a SignalServiceMessageReceiver. * @@ -229,37 +205,6 @@ public class SignalServiceMessageReceiver { return new SignalServiceStickerManifest(pack.getTitle(), pack.getAuthor(), cover, stickers); } - /** - * Creates a pipe for receiving SignalService messages. - * - * Callers must call {@link SignalServiceMessagePipe#shutdown()} when finished with the pipe. - * - * @return A SignalServiceMessagePipe for receiving Signal Service messages. - */ - public SignalServiceMessagePipe createMessagePipe() { - WebSocketConnection webSocket = new WebSocketConnection(urls.getSignalServiceUrls()[0].getUrl(), - urls.getSignalServiceUrls()[0].getTrustStore(), - Optional.of(credentialsProvider), signalAgent, connectivityListener, - sleepTimer, - urls.getNetworkInterceptors(), - urls.getDns(), - urls.getSignalProxy()); - - return new SignalServiceMessagePipe(webSocket, Optional.of(credentialsProvider), clientZkProfileOperations); - } - - public SignalServiceMessagePipe createUnidentifiedMessagePipe() { - WebSocketConnection webSocket = new WebSocketConnection(urls.getSignalServiceUrls()[0].getUrl(), - urls.getSignalServiceUrls()[0].getTrustStore(), - Optional.absent(), signalAgent, connectivityListener, - sleepTimer, - urls.getNetworkInterceptors(), - urls.getDns(), - urls.getSignalProxy()); - - return new SignalServiceMessagePipe(webSocket, Optional.of(credentialsProvider), clientZkProfileOperations); - } - public List retrieveMessages() throws IOException { return retrieveMessages(new NullMessageReceivedCallback()); } diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java index 4c2bcc5ec8..3fdce881cc 100644 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java @@ -6,10 +6,8 @@ package org.whispersystems.signalservice.api; import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; import org.signal.libsignal.metadata.certificate.SenderCertificate; -import org.signal.libsignal.metadata.protocol.UnidentifiedSenderMessageContent; import org.signal.zkgroup.profiles.ClientZkProfileOperations; import org.whispersystems.libsignal.InvalidKeyException; import org.whispersystems.libsignal.NoSessionException; @@ -39,7 +37,6 @@ import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentPoin import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentRemoteId; import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentStream; import org.whispersystems.signalservice.api.messages.SignalServiceDataMessage; -import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope; import org.whispersystems.signalservice.api.messages.SignalServiceGroup; import org.whispersystems.signalservice.api.messages.SignalServiceGroupContext; import org.whispersystems.signalservice.api.messages.SignalServiceGroupV2; @@ -72,23 +69,26 @@ import org.whispersystems.signalservice.api.push.exceptions.ProofRequiredExcepti import org.whispersystems.signalservice.api.push.exceptions.PushNetworkException; import org.whispersystems.signalservice.api.push.exceptions.ServerRejectedException; import org.whispersystems.signalservice.api.push.exceptions.UnregisteredUserException; +import org.whispersystems.signalservice.api.services.AttachmentService; +import org.whispersystems.signalservice.api.services.MessagingService; import org.whispersystems.signalservice.api.util.CredentialsProvider; import org.whispersystems.signalservice.api.util.Uint64RangeException; import org.whispersystems.signalservice.api.util.Uint64Util; import org.whispersystems.signalservice.api.util.UuidUtil; +import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException; import org.whispersystems.signalservice.internal.configuration.SignalServiceConfiguration; import org.whispersystems.signalservice.internal.crypto.PaddingInputStream; import org.whispersystems.signalservice.internal.push.AttachmentV2UploadAttributes; import org.whispersystems.signalservice.internal.push.AttachmentV3UploadAttributes; import org.whispersystems.signalservice.internal.push.GroupMismatchedDevices; import org.whispersystems.signalservice.internal.push.GroupStaleDevices; -import org.whispersystems.signalservice.internal.push.SendGroupMessageResponse; import org.whispersystems.signalservice.internal.push.MismatchedDevices; import org.whispersystems.signalservice.internal.push.OutgoingPushMessage; import org.whispersystems.signalservice.internal.push.OutgoingPushMessageList; import org.whispersystems.signalservice.internal.push.ProvisioningProtos; import org.whispersystems.signalservice.internal.push.PushAttachmentData; import org.whispersystems.signalservice.internal.push.PushServiceSocket; +import org.whispersystems.signalservice.internal.push.SendGroupMessageResponse; import org.whispersystems.signalservice.internal.push.SendMessageResponse; import org.whispersystems.signalservice.internal.push.SignalServiceProtos.AttachmentPointer; import org.whispersystems.signalservice.internal.push.SignalServiceProtos.CallMessage; @@ -110,7 +110,6 @@ import org.whispersystems.signalservice.internal.push.http.AttachmentCipherOutpu import org.whispersystems.signalservice.internal.push.http.CancelationSignal; import org.whispersystems.signalservice.internal.push.http.PartialSendCompleteListener; import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec; -import org.whispersystems.signalservice.internal.util.StaticCredentialsProvider; import org.whispersystems.signalservice.internal.util.Util; import org.whispersystems.util.Base64; import org.whispersystems.util.ByteArrayUtil; @@ -132,10 +131,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; /** @@ -149,45 +145,18 @@ public class SignalServiceMessageSender { private static final int RETRY_COUNT = 4; - private final PushServiceSocket socket; - private final SignalServiceProtocolStore store; - private final SignalSessionLock sessionLock; - private final SignalServiceAddress localAddress; - private final Optional eventListener; + private final PushServiceSocket socket; + private final SignalServiceProtocolStore store; + private final SignalSessionLock sessionLock; + private final SignalServiceAddress localAddress; + private final Optional eventListener; - private final AtomicReference> pipe; - private final AtomicReference> unidentifiedPipe; - private final AtomicBoolean isMultiDevice; + private final AttachmentService attachmentService; + private final MessagingService messagingService; + private final AtomicBoolean isMultiDevice; - private final ExecutorService executor; - private final long maxEnvelopeSize; - - /** - * Construct a SignalServiceMessageSender. - * - * @param urls The URL of the Signal Service. - * @param uuid The Signal Service UUID. - * @param e164 The Signal Service phone number. - * @param password The Signal Service user password. - * @param store The SignalProtocolStore. - * @param eventListener An optional event listener, which fires whenever sessions are - * setup or torn down for a recipient. - */ - public SignalServiceMessageSender(SignalServiceConfiguration urls, - UUID uuid, String e164, String password, - SignalServiceProtocolStore store, - SignalSessionLock sessionLock, - String signalAgent, - boolean isMultiDevice, - Optional pipe, - Optional unidentifiedPipe, - Optional eventListener, - ClientZkProfileOperations clientZkProfileOperations, - ExecutorService executor, - boolean automaticNetworkRetry) - { - this(urls, new StaticCredentialsProvider(uuid, e164, password), store, sessionLock, signalAgent, isMultiDevice, pipe, unidentifiedPipe, eventListener, clientZkProfileOperations, executor, 0, automaticNetworkRetry); - } + private final ExecutorService executor; + private final long maxEnvelopeSize; public SignalServiceMessageSender(SignalServiceConfiguration urls, CredentialsProvider credentialsProvider, @@ -195,24 +164,23 @@ public class SignalServiceMessageSender { SignalSessionLock sessionLock, String signalAgent, boolean isMultiDevice, - Optional pipe, - Optional unidentifiedPipe, + SignalWebSocket signalWebSocket, Optional eventListener, ClientZkProfileOperations clientZkProfileOperations, ExecutorService executor, long maxEnvelopeSize, boolean automaticNetworkRetry) { - this.socket = new PushServiceSocket(urls, credentialsProvider, signalAgent, clientZkProfileOperations, automaticNetworkRetry); - this.store = store; - this.sessionLock = sessionLock; - this.localAddress = new SignalServiceAddress(credentialsProvider.getUuid(), credentialsProvider.getE164()); - this.pipe = new AtomicReference<>(pipe); - this.unidentifiedPipe = new AtomicReference<>(unidentifiedPipe); - this.isMultiDevice = new AtomicBoolean(isMultiDevice); - this.eventListener = eventListener; - this.executor = executor != null ? executor : Executors.newSingleThreadExecutor(); - this.maxEnvelopeSize = maxEnvelopeSize; + this.socket = new PushServiceSocket(urls, credentialsProvider, signalAgent, clientZkProfileOperations, automaticNetworkRetry); + this.store = store; + this.sessionLock = sessionLock; + this.localAddress = new SignalServiceAddress(credentialsProvider.getUuid(), credentialsProvider.getE164()); + this.attachmentService = new AttachmentService(signalWebSocket); + this.messagingService = new MessagingService(signalWebSocket); + this.isMultiDevice = new AtomicBoolean(isMultiDevice); + this.eventListener = eventListener; + this.executor = executor != null ? executor : Executors.newSingleThreadExecutor(); + this.maxEnvelopeSize = maxEnvelopeSize; } /** @@ -538,9 +506,7 @@ public class SignalServiceMessageSender { socket.cancelInFlightRequests(); } - public void update(SignalServiceMessagePipe pipe, SignalServiceMessagePipe unidentifiedPipe, boolean isMultiDevice) { - this.pipe.set(Optional.fromNullable(pipe)); - this.unidentifiedPipe.set(Optional.fromNullable(unidentifiedPipe)); + public void update(boolean isMultiDevice) { this.isMultiDevice.set(isMultiDevice); } @@ -569,15 +535,14 @@ public class SignalServiceMessageSender { throws NonSuccessfulResponseCodeException, PushNetworkException, MalformedResponseException { AttachmentV2UploadAttributes v2UploadAttributes = null; - Optional localPipe = pipe.get(); - if (localPipe.isPresent()) { - Log.d(TAG, "Using pipe to retrieve attachment upload attributes..."); - try { - v2UploadAttributes = localPipe.get().getAttachmentV2UploadAttributes(); - } catch (IOException e) { - Log.w(TAG, "Failed to retrieve attachment upload attributes using pipe. Falling back..."); - } + Log.d(TAG, "Using pipe to retrieve attachment upload attributes..."); + try { + v2UploadAttributes = new AttachmentService.AttachmentAttributesResponseProcessor<>(attachmentService.getAttachmentV2UploadAttributes().blockingGet()).getResultOrThrow(); + } catch (WebSocketUnavailableException e) { + Log.w(TAG, "[uploadAttachmentV2] Pipe unavailable, falling back... (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")"); + } catch (IOException e) { + Log.w(TAG, "Failed to retrieve attachment upload attributes using pipe. Falling back..."); } if (v2UploadAttributes == null) { @@ -606,15 +571,14 @@ public class SignalServiceMessageSender { public ResumableUploadSpec getResumableUploadSpec() throws IOException { AttachmentV3UploadAttributes v3UploadAttributes = null; - Optional localPipe = pipe.get(); - if (localPipe.isPresent()) { - Log.d(TAG, "Using pipe to retrieve attachment upload attributes..."); - try { - v3UploadAttributes = localPipe.get().getAttachmentV3UploadAttributes(); - } catch (IOException e) { - Log.w(TAG, "Failed to retrieve attachment upload attributes using pipe. Falling back..."); - } + Log.d(TAG, "Using pipe to retrieve attachment upload attributes..."); + try { + v3UploadAttributes = new AttachmentService.AttachmentAttributesResponseProcessor<>(attachmentService.getAttachmentV3UploadAttributes().blockingGet()).getResultOrThrow(); + } catch (WebSocketUnavailableException e) { + Log.w(TAG, "[getResumableUploadSpec] Pipe unavailable, falling back... (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")"); + } catch (IOException e) { + Log.w(TAG, "Failed to retrieve attachment upload attributes using pipe. Falling back..."); } if (v3UploadAttributes == null) { @@ -800,7 +764,7 @@ public class SignalServiceMessageSender { .setLength(mention.getLength()) .setMentionUuid(mention.getUuid().toString())); } - + builder.setRequiredProtocolVersion(Math.max(DataMessage.ProtocolVersion.MENTIONS_VALUE, builder.getRequiredProtocolVersion())); } @@ -1631,22 +1595,23 @@ public class SignalServiceMessageSender { throw new CancelationException(); } - Optional pipe = this.pipe.get(); - Optional unidentifiedPipe = this.unidentifiedPipe.get(); - - if (pipe.isPresent() && !unidentifiedAccess.isPresent()) { + if (!unidentifiedAccess.isPresent()) { try { - SendMessageResponse response = pipe.get().send(messages, Optional.absent()).get(10, TimeUnit.SECONDS); + SendMessageResponse response = new MessagingService.SendResponseProcessor<>(messagingService.send(messages, Optional.absent()).blockingGet()).getResultOrThrow(); return SendMessageResult.success(recipient, messages.getDevices(), false, response.getNeedsSync() || isMultiDevice.get(), System.currentTimeMillis() - startTime, content.getContent()); - } catch (IOException | ExecutionException | InterruptedException | TimeoutException e) { + } catch (WebSocketUnavailableException e) { + Log.i(TAG, "[sendMessage] Pipe unavailable, falling back... (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")"); + } catch (IOException e) { Log.w(TAG, e); Log.w(TAG, "[sendMessage] Pipe failed, falling back... (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")"); } - } else if (unidentifiedPipe.isPresent() && unidentifiedAccess.isPresent()) { + } else if (unidentifiedAccess.isPresent()) { try { - SendMessageResponse response = unidentifiedPipe.get().send(messages, unidentifiedAccess).get(10, TimeUnit.SECONDS); + SendMessageResponse response = new MessagingService.SendResponseProcessor<>(messagingService.send(messages, unidentifiedAccess).blockingGet()).getResultOrThrow(); return SendMessageResult.success(recipient, messages.getDevices(), true, response.getNeedsSync() || isMultiDevice.get(), System.currentTimeMillis() - startTime, content.getContent()); - } catch (IOException | ExecutionException | InterruptedException | TimeoutException e) { + } catch (WebSocketUnavailableException e) { + Log.i(TAG, "[sendMessage] Unidentified pipe unavailable, falling back... (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")"); + } catch (IOException e) { Log.w(TAG, e); Log.w(TAG, "[sendMessage] Unidentified pipe failed, falling back..."); } @@ -1808,17 +1773,13 @@ public class SignalServiceMessageSender { joinedUnidentifiedAccess = ByteArrayUtil.xor(joinedUnidentifiedAccess, access.getUnidentifiedAccessKey()); } - Optional pipe = this.unidentifiedPipe.get(); - - if (pipe.isPresent()) { - try { - SendGroupMessageResponse response = pipe.get().sendToGroup(ciphertext, joinedUnidentifiedAccess, timestamp, online).get(10, TimeUnit.SECONDS); - return transformGroupResponseToMessageResults(recipientDevices, response, content); - } catch (IOException | ExecutionException | InterruptedException | TimeoutException e) { - Log.w(TAG, "[sendGroupMessage] Pipe failed, falling back... (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")"); - } - } else { - Log.d(TAG, "[sendGroupMessage] No pipe available."); + try { + SendGroupMessageResponse response = new MessagingService.SendResponseProcessor<>(messagingService.sendToGroup(ciphertext, joinedUnidentifiedAccess, timestamp, online).blockingGet()).getResultOrThrow(); + return transformGroupResponseToMessageResults(recipientDevices, response, content); + } catch (WebSocketUnavailableException e) { + Log.i(TAG, "[sendGroupMessage] Pipe unavailable, falling back... (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")"); + } catch (IOException e) { + Log.w(TAG, "[sendGroupMessage] Pipe failed, falling back... (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")"); } try { diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalWebSocket.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalWebSocket.java new file mode 100644 index 0000000000..b00a5704ec --- /dev/null +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalWebSocket.java @@ -0,0 +1,220 @@ +package org.whispersystems.signalservice.api; + +import org.whispersystems.libsignal.logging.Log; +import org.whispersystems.libsignal.util.guava.Optional; +import org.whispersystems.signalservice.api.crypto.UnidentifiedAccess; +import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope; +import org.whispersystems.signalservice.api.websocket.WebSocketFactory; +import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException; +import org.whispersystems.signalservice.internal.websocket.WebSocketConnection; +import org.whispersystems.signalservice.internal.websocket.WebSocketProtos.WebSocketRequestMessage; +import org.whispersystems.signalservice.internal.websocket.WebSocketProtos.WebSocketResponseMessage; +import org.whispersystems.signalservice.internal.websocket.WebsocketResponse; +import org.whispersystems.util.Base64; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +import io.reactivex.rxjava3.core.Single; + +/** + * Provide a general interface to the WebSocket for making requests and reading messages sent by the server. + * Where appropriate, it will handle retrying failed unidentified requests on the regular WebSocket. + */ +public final class SignalWebSocket { + + private static final String TAG = SignalWebSocket.class.getSimpleName(); + + private static final String SERVER_DELIVERED_TIMESTAMP_HEADER = "X-Signal-Timestamp"; + + private final WebSocketFactory webSocketFactory; + + private WebSocketConnection webSocket; + private WebSocketConnection unidentifiedWebSocket; + private boolean canConnect; + + public SignalWebSocket(WebSocketFactory webSocketFactory) { + this.webSocketFactory = webSocketFactory; + } + + public synchronized void connect() { + canConnect = true; + try { + getWebSocket(); + getUnidentifiedWebSocket(); + } catch (WebSocketUnavailableException e) { + throw new AssertionError(e); + } + } + + public synchronized void disconnect() { + canConnect = false; + + if (webSocket != null) { + webSocket.disconnect(); + webSocket = null; + } + + if (unidentifiedWebSocket != null) { + unidentifiedWebSocket.disconnect(); + unidentifiedWebSocket = null; + } + } + + private synchronized WebSocketConnection getWebSocket() throws WebSocketUnavailableException { + if (!canConnect) { + throw new WebSocketUnavailableException(); + } + + if (webSocket == null || webSocket.isDead()) { + webSocket = webSocketFactory.createWebSocket(); + webSocket.connect(); + } + return webSocket; + } + + private synchronized WebSocketConnection getUnidentifiedWebSocket() throws WebSocketUnavailableException { + if (!canConnect) { + throw new WebSocketUnavailableException(); + } + + if (unidentifiedWebSocket == null || unidentifiedWebSocket.isDead()) { + unidentifiedWebSocket = webSocketFactory.createUnidentifiedWebSocket(); + unidentifiedWebSocket.connect(); + } + return unidentifiedWebSocket; + } + + public Single request(WebSocketRequestMessage requestMessage) { + try { + return getWebSocket().sendRequest(requestMessage); + } catch (IOException e) { + return Single.error(e); + } + } + + public Single request(WebSocketRequestMessage requestMessage, Optional unidentifiedAccess) { + if (unidentifiedAccess.isPresent()) { + WebSocketRequestMessage message = WebSocketRequestMessage.newBuilder(requestMessage) + .addHeaders("Unidentified-Access-Key:" + Base64.encodeBytes(unidentifiedAccess.get().getUnidentifiedAccessKey())) + .build(); + Single response; + try { + response = getUnidentifiedWebSocket().sendRequest(message); + } catch (IOException e) { + return Single.error(e); + } + + return response.flatMap(r -> { + if (r.getStatus() == 401) { + return request(requestMessage); + } + return Single.just(r); + }) + .onErrorResumeNext(t -> request(requestMessage)); + } else { + return request(requestMessage); + } + } + + /** + *

+ * A blocking call that reads a message off the pipe. When this call returns, the message has been + * acknowledged and will not be retransmitted. This will return {@link Optional#absent()} when an + * empty response is hit, which indicates the WebSocket is empty. + *

+ * You can specify a {@link MessageReceivedCallback} that will be called before the received message is acknowledged. + * This allows you to write the received message to durable storage before acknowledging receipt of it to the + * server. + *

+ * Important: The empty response will only be hit once for each connection. That means if you get + * an empty response and call readOrEmpty() again on the same instance, you will not get an empty + * response, and instead will block until you get an actual message. This will, however, reset if + * connection breaks (if, for instance, you lose and regain network). + * + * @param timeout The timeout to wait for. + * @param callback A callback that will be called before the message receipt is acknowledged to the server. + * @return The message read (same as the message sent through the callback). + */ + @SuppressWarnings("DuplicateThrows") + public Optional readOrEmpty(long timeout, MessageReceivedCallback callback) + throws TimeoutException, WebSocketUnavailableException, IOException + { + while (true) { + WebSocketRequestMessage request = getWebSocket().readRequest(timeout); + WebSocketResponseMessage response = createWebSocketResponse(request); + try { + if (isSignalServiceEnvelope(request)) { + Optional timestampHeader = findHeader(request); + long timestamp = 0; + + if (timestampHeader.isPresent()) { + try { + timestamp = Long.parseLong(timestampHeader.get()); + } catch (NumberFormatException e) { + Log.w(TAG, "Failed to parse " + SERVER_DELIVERED_TIMESTAMP_HEADER); + } + } + + SignalServiceEnvelope envelope = new SignalServiceEnvelope(request.getBody().toByteArray(), timestamp); + + callback.onMessage(envelope); + return Optional.of(envelope); + } else if (isSocketEmptyRequest(request)) { + return Optional.absent(); + } + } finally { + getWebSocket().sendResponse(response); + } + } + } + + private static boolean isSignalServiceEnvelope(WebSocketRequestMessage message) { + return "PUT".equals(message.getVerb()) && "/api/v1/message".equals(message.getPath()); + } + + private static boolean isSocketEmptyRequest(WebSocketRequestMessage message) { + return "PUT".equals(message.getVerb()) && "/api/v1/queue/empty".equals(message.getPath()); + } + + private static WebSocketResponseMessage createWebSocketResponse(WebSocketRequestMessage request) { + if (isSignalServiceEnvelope(request)) { + return WebSocketResponseMessage.newBuilder() + .setId(request.getId()) + .setStatus(200) + .setMessage("OK") + .build(); + } else { + return WebSocketResponseMessage.newBuilder() + .setId(request.getId()) + .setStatus(400) + .setMessage("Unknown") + .build(); + } + } + + private static Optional findHeader(WebSocketRequestMessage message) { + if (message.getHeadersCount() == 0) { + return Optional.absent(); + } + + for (String header : message.getHeadersList()) { + if (header.startsWith(SERVER_DELIVERED_TIMESTAMP_HEADER)) { + String[] split = header.split(":"); + if (split.length == 2 && split[0].trim().toLowerCase().equals(SERVER_DELIVERED_TIMESTAMP_HEADER.toLowerCase())) { + return Optional.of(split[1].trim()); + } + } + } + + return Optional.absent(); + } + + /** + * For receiving a callback when a new message has been + * received. + */ + public interface MessageReceivedCallback { + void onMessage(SignalServiceEnvelope envelope); + } +} diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/services/AttachmentService.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/services/AttachmentService.java new file mode 100644 index 0000000000..292780ea1e --- /dev/null +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/services/AttachmentService.java @@ -0,0 +1,56 @@ +package org.whispersystems.signalservice.api.services; + +import org.whispersystems.signalservice.api.SignalWebSocket; +import org.whispersystems.signalservice.internal.ServiceResponse; +import org.whispersystems.signalservice.internal.ServiceResponseProcessor; +import org.whispersystems.signalservice.internal.push.AttachmentV2UploadAttributes; +import org.whispersystems.signalservice.internal.push.AttachmentV3UploadAttributes; +import org.whispersystems.signalservice.internal.websocket.DefaultResponseMapper; +import org.whispersystems.signalservice.internal.websocket.WebSocketProtos.WebSocketRequestMessage; + +import java.security.SecureRandom; + +import io.reactivex.rxjava3.core.Single; + +/** + * Provide WebSocket based interface to attachment upload endpoints. + * + * Note: To be expanded to have REST fallback and other attachment related operations. + */ +public final class AttachmentService { + private final SignalWebSocket signalWebSocket; + + public AttachmentService(SignalWebSocket signalWebSocket) { + this.signalWebSocket = signalWebSocket; + } + + public Single> getAttachmentV2UploadAttributes() { + WebSocketRequestMessage requestMessage = WebSocketRequestMessage.newBuilder() + .setId(new SecureRandom().nextLong()) + .setVerb("GET") + .setPath("/v2/attachments/form/upload") + .build(); + + return signalWebSocket.request(requestMessage) + .map(DefaultResponseMapper.getDefault(AttachmentV2UploadAttributes.class)::map) + .onErrorReturn(ServiceResponse::forUnknownError); + } + + public Single> getAttachmentV3UploadAttributes() { + WebSocketRequestMessage requestMessage = WebSocketRequestMessage.newBuilder() + .setId(new SecureRandom().nextLong()) + .setVerb("GET") + .setPath("/v3/attachments/form/upload") + .build(); + + return signalWebSocket.request(requestMessage) + .map(DefaultResponseMapper.getDefault(AttachmentV3UploadAttributes.class)::map) + .onErrorReturn(ServiceResponse::forUnknownError); + } + + public static class AttachmentAttributesResponseProcessor extends ServiceResponseProcessor { + public AttachmentAttributesResponseProcessor(ServiceResponse response) { + super(response); + } + } +} diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/services/MessagingService.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/services/MessagingService.java new file mode 100644 index 0000000000..cc11b22f6b --- /dev/null +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/services/MessagingService.java @@ -0,0 +1,94 @@ +package org.whispersystems.signalservice.api.services; + +import com.google.protobuf.ByteString; + +import org.whispersystems.libsignal.util.guava.Optional; +import org.whispersystems.signalservice.api.SignalWebSocket; +import org.whispersystems.signalservice.api.crypto.UnidentifiedAccess; +import org.whispersystems.signalservice.api.push.exceptions.NotFoundException; +import org.whispersystems.signalservice.api.push.exceptions.UnregisteredUserException; +import org.whispersystems.signalservice.internal.ServiceResponse; +import org.whispersystems.signalservice.internal.ServiceResponseProcessor; +import org.whispersystems.signalservice.internal.push.OutgoingPushMessageList; +import org.whispersystems.signalservice.internal.push.SendGroupMessageResponse; +import org.whispersystems.signalservice.internal.push.SendMessageResponse; +import org.whispersystems.signalservice.internal.util.JsonUtil; +import org.whispersystems.signalservice.internal.util.Util; +import org.whispersystems.signalservice.internal.websocket.DefaultResponseMapper; +import org.whispersystems.signalservice.internal.websocket.ResponseMapper; +import org.whispersystems.signalservice.internal.websocket.WebSocketProtos.WebSocketRequestMessage; +import org.whispersystems.util.Base64; + +import java.security.SecureRandom; +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; + +import io.reactivex.rxjava3.core.Single; + +/** + * Provide WebSocket based interface to message sending endpoints. + *

+ * Note: To be expanded to have REST fallback and other messaging related operations. + */ +public class MessagingService { + private final SignalWebSocket signalWebSocket; + + public MessagingService(SignalWebSocket signalWebSocket) { + this.signalWebSocket = signalWebSocket; + } + + public Single> send(OutgoingPushMessageList list, Optional unidentifiedAccess) { + List headers = new LinkedList() {{ + add("content-type:application/json"); + }}; + + WebSocketRequestMessage requestMessage = WebSocketRequestMessage.newBuilder() + .setId(new SecureRandom().nextLong()) + .setVerb("PUT") + .setPath(String.format("/v1/messages/%s", list.getDestination())) + .addAllHeaders(headers) + .setBody(ByteString.copyFrom(JsonUtil.toJson(list).getBytes())) + .build(); + + ResponseMapper responseMapper = DefaultResponseMapper.extend(SendMessageResponse.class) + .withResponseMapper((status, body, getHeader) -> { + SendMessageResponse sendMessageResponse = Util.isEmpty(body) ? new SendMessageResponse(false) + : JsonUtil.fromJsonResponse(body, SendMessageResponse.class); + return ServiceResponse.forResult(sendMessageResponse, status, body); + }) + .withCustomError(404, (status, body, getHeader) -> new UnregisteredUserException(list.getDestination(), new NotFoundException("not found"))) + .build(); + + return signalWebSocket.request(requestMessage, unidentifiedAccess) + .map(responseMapper::map) + .onErrorReturn(ServiceResponse::forUnknownError); + } + + public Single> sendToGroup(byte[] body, byte[] joinedUnidentifiedAccess, long timestamp, boolean online) { + List headers = new LinkedList() {{ + add("content-type:application/vnd.signal-messenger.mrm"); + add("Unidentified-Access-Key:" + Base64.encodeBytes(joinedUnidentifiedAccess)); + }}; + + String path = String.format(Locale.US, "/v1/messages/multi_recipient?ts=%s&online=%s", timestamp, online); + + WebSocketRequestMessage requestMessage = WebSocketRequestMessage.newBuilder() + .setId(new SecureRandom().nextLong()) + .setVerb("PUT") + .setPath(path) + .addAllHeaders(headers) + .setBody(ByteString.copyFrom(body)) + .build(); + + return signalWebSocket.request(requestMessage) + .map(DefaultResponseMapper.getDefault(SendGroupMessageResponse.class)::map) + .onErrorReturn(ServiceResponse::forUnknownError); + } + + public static class SendResponseProcessor extends ServiceResponseProcessor { + public SendResponseProcessor(ServiceResponse response) { + super(response); + } + } +} diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/services/ProfileService.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/services/ProfileService.java new file mode 100644 index 0000000000..e0448b3022 --- /dev/null +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/services/ProfileService.java @@ -0,0 +1,161 @@ +package org.whispersystems.signalservice.api.services; + +import org.signal.zkgroup.VerificationFailedException; +import org.signal.zkgroup.profiles.ClientZkProfileOperations; +import org.signal.zkgroup.profiles.ProfileKey; +import org.signal.zkgroup.profiles.ProfileKeyCredential; +import org.signal.zkgroup.profiles.ProfileKeyCredentialRequest; +import org.signal.zkgroup.profiles.ProfileKeyCredentialRequestContext; +import org.signal.zkgroup.profiles.ProfileKeyVersion; +import org.whispersystems.libsignal.util.Pair; +import org.whispersystems.libsignal.util.guava.Function; +import org.whispersystems.libsignal.util.guava.Optional; +import org.whispersystems.signalservice.api.SignalServiceMessageReceiver; +import org.whispersystems.signalservice.api.SignalWebSocket; +import org.whispersystems.signalservice.api.crypto.UnidentifiedAccess; +import org.whispersystems.signalservice.api.profiles.ProfileAndCredential; +import org.whispersystems.signalservice.api.profiles.SignalServiceProfile; +import org.whispersystems.signalservice.api.push.SignalServiceAddress; +import org.whispersystems.signalservice.api.push.exceptions.MalformedResponseException; +import org.whispersystems.signalservice.internal.ServiceResponse; +import org.whispersystems.signalservice.internal.ServiceResponseProcessor; +import org.whispersystems.signalservice.internal.util.Hex; +import org.whispersystems.signalservice.internal.util.JsonUtil; +import org.whispersystems.signalservice.internal.websocket.DefaultResponseMapper; +import org.whispersystems.signalservice.internal.websocket.ResponseMapper; +import org.whispersystems.signalservice.internal.websocket.WebSocketProtos; + +import java.security.SecureRandom; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import io.reactivex.rxjava3.core.Single; + +/** + * Provide Profile-related API services, encapsulating the logic to make the request, parse the response, + * and fallback to appropriate WebSocket or RESTful alternatives. + */ +public final class ProfileService { + + private static final String TAG = ProfileService.class.getSimpleName(); + + private final ClientZkProfileOperations clientZkProfileOperations; + private final SignalServiceMessageReceiver receiver; + private final SignalWebSocket signalWebSocket; + + public ProfileService(ClientZkProfileOperations clientZkProfileOperations, + SignalServiceMessageReceiver receiver, + SignalWebSocket signalWebSocket) + { + this.clientZkProfileOperations = clientZkProfileOperations; + this.receiver = receiver; + this.signalWebSocket = signalWebSocket; + } + + public Single> getProfile(SignalServiceAddress address, + Optional profileKey, + Optional unidentifiedAccess, + SignalServiceProfile.RequestType requestType) + { + Optional uuid = address.getUuid(); + SecureRandom random = new SecureRandom(); + ProfileKeyCredentialRequestContext requestContext = null; + + WebSocketProtos.WebSocketRequestMessage.Builder builder = WebSocketProtos.WebSocketRequestMessage.newBuilder() + .setId(random.nextLong()) + .setVerb("GET"); + + if (uuid.isPresent() && profileKey.isPresent()) { + UUID target = uuid.get(); + ProfileKeyVersion profileKeyIdentifier = profileKey.get().getProfileKeyVersion(target); + String version = profileKeyIdentifier.serialize(); + + if (requestType == SignalServiceProfile.RequestType.PROFILE_AND_CREDENTIAL) { + requestContext = clientZkProfileOperations.createProfileKeyCredentialRequestContext(random, target, profileKey.get()); + + ProfileKeyCredentialRequest request = requestContext.getRequest(); + String credentialRequest = Hex.toStringCondensed(request.serialize()); + + builder.setPath(String.format("/v1/profile/%s/%s/%s", target, version, credentialRequest)); + } else { + builder.setPath(String.format("/v1/profile/%s/%s", target, version)); + } + } else { + builder.setPath(String.format("/v1/profile/%s", address.getIdentifier())); + } + + WebSocketProtos.WebSocketRequestMessage requestMessage = builder.build(); + + ResponseMapper responseMapper = DefaultResponseMapper.extend(ProfileAndCredential.class) + .withResponseMapper(new ProfileResponseMapper(requestType, requestContext)) + .build(); + + return signalWebSocket.request(requestMessage, unidentifiedAccess) + .map(responseMapper::map) + .onErrorResumeNext(t -> restFallback(address, profileKey, unidentifiedAccess, requestType)) + .onErrorReturn(ServiceResponse::forUnknownError); + } + + private Single> restFallback(SignalServiceAddress address, + Optional profileKey, + Optional unidentifiedAccess, + SignalServiceProfile.RequestType requestType) + { + return Single.fromFuture(receiver.retrieveProfile(address, profileKey, unidentifiedAccess, requestType), 10, TimeUnit.SECONDS) + .onErrorResumeNext(t -> Single.fromFuture(receiver.retrieveProfile(address, profileKey, Optional.absent(), requestType), 10, TimeUnit.SECONDS)) + .map(p -> ServiceResponse.forResult(p, 0, null)); + } + + /** + * Maps the API {@link SignalServiceProfile} model into the desired {@link ProfileAndCredential} domain model. + */ + private class ProfileResponseMapper implements DefaultResponseMapper.CustomResponseMapper { + private final SignalServiceProfile.RequestType requestType; + private final ProfileKeyCredentialRequestContext requestContext; + + public ProfileResponseMapper(SignalServiceProfile.RequestType requestType, ProfileKeyCredentialRequestContext requestContext) { + this.requestType = requestType; + this.requestContext = requestContext; + } + + @Override + public ServiceResponse map(int status, String body, Function getHeader) + throws MalformedResponseException + { + try { + SignalServiceProfile signalServiceProfile = JsonUtil.fromJsonResponse(body, SignalServiceProfile.class); + ProfileKeyCredential profileKeyCredential = null; + if (requestContext != null && signalServiceProfile.getProfileKeyCredentialResponse() != null) { + profileKeyCredential = clientZkProfileOperations.receiveProfileKeyCredential(requestContext, signalServiceProfile.getProfileKeyCredentialResponse()); + } + + return ServiceResponse.forResult(new ProfileAndCredential(signalServiceProfile, requestType, Optional.fromNullable(profileKeyCredential)), status, body); + } catch (VerificationFailedException e) { + return ServiceResponse.forApplicationError(e, status, body); + } + } + } + + /** + * Response processor for {@link ProfileAndCredential} service response. + */ + public static final class ProfileResponseProcessor extends ServiceResponseProcessor { + public ProfileResponseProcessor(ServiceResponse response) { + super(response); + } + + public Pair getResult(T with) { + return new Pair<>(with, getResult()); + } + + @Override + public boolean notFound() { + return super.notFound(); + } + + @Override + public boolean genericIoError() { + return super.genericIoError(); + } + } +} diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/websocket/WebSocketFactory.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/websocket/WebSocketFactory.java new file mode 100644 index 0000000000..46f03694f6 --- /dev/null +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/websocket/WebSocketFactory.java @@ -0,0 +1,8 @@ +package org.whispersystems.signalservice.api.websocket; + +import org.whispersystems.signalservice.internal.websocket.WebSocketConnection; + +public interface WebSocketFactory { + WebSocketConnection createWebSocket(); + WebSocketConnection createUnidentifiedWebSocket(); +} diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/websocket/WebSocketUnavailableException.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/websocket/WebSocketUnavailableException.java new file mode 100644 index 0000000000..b0ed57b9f7 --- /dev/null +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/websocket/WebSocketUnavailableException.java @@ -0,0 +1,14 @@ +package org.whispersystems.signalservice.api.websocket; + +import java.io.IOException; + +/** + * Thrown when the WebSocket is not available for use by runtime policy. Currently, the + * WebSocket is only available when the app is in the foreground and requested via IncomingMessageObserver. + * Or, when using WebSocket Strategy. + */ +public final class WebSocketUnavailableException extends IOException { + public WebSocketUnavailableException() { + super("WebSocket not currently available."); + } +} diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/ServiceResponse.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/ServiceResponse.java new file mode 100644 index 0000000000..0f9fbb3b66 --- /dev/null +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/ServiceResponse.java @@ -0,0 +1,99 @@ +package org.whispersystems.signalservice.internal; + +import org.whispersystems.libsignal.util.guava.Optional; +import org.whispersystems.libsignal.util.guava.Preconditions; +import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulResponseCodeException; +import org.whispersystems.signalservice.internal.websocket.WebsocketResponse; + +import java.util.concurrent.ExecutionException; + +/** + * Encapsulates a parsed APi response regardless of where it came from (WebSocket or REST). Not only + * includes the success result but also any application errors encountered (404s, parsing, etc.) or + * execution errors encountered (IOException, etc.). + */ +public final class ServiceResponse { + + private final int status; + private final Optional body; + private final Optional result; + private final Optional applicationError; + private final Optional executionError; + + private ServiceResponse(Result result, WebsocketResponse response) { + this(response.getStatus(), response.getBody(), result, null, null); + } + + private ServiceResponse(Throwable applicationError, WebsocketResponse response) { + this(response.getStatus(), response.getBody(), null, applicationError, null); + } + + public ServiceResponse(int status, + String body, + Result result, + Throwable applicationError, + Throwable executionError) + { + if (result != null) { + Preconditions.checkArgument(applicationError == null && executionError == null); + } else { + Preconditions.checkArgument(applicationError != null || executionError != null); + } + + this.status = status; + this.body = Optional.fromNullable(body); + this.result = Optional.fromNullable(result); + this.applicationError = Optional.fromNullable(applicationError); + this.executionError = Optional.fromNullable(executionError); + } + + public int getStatus() { + return status; + } + + public Optional getBody() { + return body; + } + + public Optional getResult() { + return result; + } + + public Optional getApplicationError() { + return applicationError; + } + + public Optional getExecutionError() { + return executionError; + } + + public static ServiceResponse forResult(T result, WebsocketResponse response) { + return new ServiceResponse<>(result, response); + } + + public static ServiceResponse forResult(T result, int status, String body) { + return new ServiceResponse<>(status, body, result, null, null); + } + + public static ServiceResponse forApplicationError(Throwable throwable, WebsocketResponse response) { + return new ServiceResponse(throwable, response); + } + + public static ServiceResponse forApplicationError(Throwable throwable, int status, String body) { + return new ServiceResponse<>(status, body, null, throwable, null); + } + + public static ServiceResponse forExecutionError(Throwable throwable) { + return new ServiceResponse<>(0, null, null, null, throwable); + } + + public static ServiceResponse forUnknownError(Throwable throwable) { + if (throwable instanceof ExecutionException) { + return forUnknownError(throwable.getCause()); + } else if (throwable instanceof NonSuccessfulResponseCodeException) { + return forApplicationError(throwable, ((NonSuccessfulResponseCodeException) throwable).getCode(), null); + } else { + return forExecutionError(throwable); + } + } +} diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/ServiceResponseProcessor.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/ServiceResponseProcessor.java new file mode 100644 index 0000000000..67cb6929ed --- /dev/null +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/ServiceResponseProcessor.java @@ -0,0 +1,123 @@ +package org.whispersystems.signalservice.internal; + +import org.whispersystems.libsignal.util.guava.Preconditions; +import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulResponseCodeException; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +/** + * Provide the basis for processing a {@link ServiceResponse} in a sharable, quasi-enforceable + * ways. The goal is to balance the readability at the call sites where the various cases are handled + * and provide call specific information of what should be expected. + *

+ * General premise is for subclasses to override and expose (via access modifier) the types of errors that + * should be handled when processing a response. For example, if {@link #notFound()} should be specifically + * handled then a subclass should override it, change the modifier to public, and then the caller knows it's + * a possible error case. + *

+ * This doesn't exactly enforce the handling like a check exception would, but does hint + * to the caller what they should be aware of as possible outcomes of processing a response. + */ +public abstract class ServiceResponseProcessor { + + protected final ServiceResponse response; + + public ServiceResponseProcessor(ServiceResponse response) { + this.response = response; + } + + public ServiceResponse getResponse() { + return response; + } + + public T getResult() { + Preconditions.checkArgument(response.getResult().isPresent()); + return response.getResult().get(); + } + + public T getResultOrThrow() throws IOException { + if (hasResult()) { + return getResult(); + } + + Throwable error = getError(); + if (error instanceof IOException) { + throw (IOException) error; + } else if (error instanceof RuntimeException) { + throw (RuntimeException) error; + } else if (error instanceof InterruptedException || error instanceof TimeoutException) { + throw new IOException(error); + } else { + throw new IllegalStateException("Unexpected error type for response processor", error); + } + } + + public boolean hasResult() { + return response.getResult().isPresent(); + } + + protected Throwable getError() { + return response.getApplicationError().or(response.getExecutionError()).orNull(); + } + + protected boolean authorizationFailed() { + return response.getStatus() == 401 || response.getStatus() == 403; + } + + protected boolean notFound() { + return response.getStatus() == 404; + } + + protected boolean mismatchedDevices() { + return response.getStatus() == 409; + } + + protected boolean staleDevices() { + return response.getStatus() == 410; + } + + protected boolean deviceLimitedExceeded() { + return response.getStatus() == 411; + } + + protected boolean rateLimit() { + return response.getStatus() == 413; + } + + protected boolean expectationFailed() { + return response.getStatus() == 417; + } + + protected boolean registrationLock() { + return response.getStatus() == 423; + } + + protected boolean proofRequired() { + return response.getStatus() == 428; + } + + protected boolean deprecatedVersion() { + return response.getStatus() == 499; + } + + protected boolean serverRejected() { + return response.getStatus() == 508; + } + + protected boolean notSuccessful() { + return response.getStatus() != 200 && response.getStatus() != 202 && response.getStatus() != 204; + } + + protected boolean genericIoError() { + Throwable error = getError(); + + if (error instanceof NonSuccessfulResponseCodeException) { + return false; + } + + return error instanceof IOException || + error instanceof TimeoutException || + error instanceof InterruptedException; + } +} diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/push/LockedException.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/push/LockedException.java index 4ec2310a2f..8ab00aa22a 100644 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/push/LockedException.java +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/push/LockedException.java @@ -9,7 +9,7 @@ public final class LockedException extends NonSuccessfulResponseCodeException { private final long timeRemaining; private final String basicStorageCredentials; - LockedException(int length, long timeRemaining, String basicStorageCredentials) { + public LockedException(int length, long timeRemaining, String basicStorageCredentials) { super(423); this.length = length; this.timeRemaining = timeRemaining; diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/push/PushServiceSocket.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/push/PushServiceSocket.java index 23dc9940a9..776ea3abd6 100644 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/push/PushServiceSocket.java +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/push/PushServiceSocket.java @@ -2043,15 +2043,15 @@ public class PushServiceSocket { } } - private static class RegistrationLockFailure { + public static class RegistrationLockFailure { @JsonProperty - private int length; + public int length; @JsonProperty - private long timeRemaining; + public long timeRemaining; @JsonProperty - private AuthCredentials backupCredentials; + public AuthCredentials backupCredentials; } private static class ConnectionHolder { diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/util/JsonUtil.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/util/JsonUtil.java index 5bb8d0d903..d0341c527a 100644 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/util/JsonUtil.java +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/util/JsonUtil.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.SerializerProvider; import org.whispersystems.libsignal.IdentityKey; import org.whispersystems.libsignal.InvalidKeyException; import org.whispersystems.libsignal.logging.Log; +import org.whispersystems.signalservice.api.push.exceptions.MalformedResponseException; import org.whispersystems.signalservice.api.util.UuidUtil; import org.whispersystems.util.Base64; @@ -50,6 +51,15 @@ public class JsonUtil { { return objectMapper.readValue(json, clazz); } + + public static T fromJsonResponse(String body, Class clazz) + throws MalformedResponseException { + try { + return JsonUtil.fromJson(body, clazz); + } catch (IOException e) { + throw new MalformedResponseException("Unable to parse entity", e); + } + } public static class IdentityKeySerializer extends JsonSerializer { @Override diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/DefaultErrorMapper.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/DefaultErrorMapper.java new file mode 100644 index 0000000000..683fd163f9 --- /dev/null +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/DefaultErrorMapper.java @@ -0,0 +1,149 @@ +package org.whispersystems.signalservice.internal.websocket; + +import org.whispersystems.libsignal.util.guava.Function; +import org.whispersystems.signalservice.api.push.exceptions.AuthorizationFailedException; +import org.whispersystems.signalservice.api.push.exceptions.DeprecatedVersionException; +import org.whispersystems.signalservice.api.push.exceptions.ExpectationFailedException; +import org.whispersystems.signalservice.api.push.exceptions.MalformedResponseException; +import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulResponseCodeException; +import org.whispersystems.signalservice.api.push.exceptions.NotFoundException; +import org.whispersystems.signalservice.api.push.exceptions.ProofRequiredException; +import org.whispersystems.signalservice.api.push.exceptions.RateLimitException; +import org.whispersystems.signalservice.api.push.exceptions.ServerRejectedException; +import org.whispersystems.signalservice.internal.push.AuthCredentials; +import org.whispersystems.signalservice.internal.push.DeviceLimit; +import org.whispersystems.signalservice.internal.push.DeviceLimitExceededException; +import org.whispersystems.signalservice.internal.push.LockedException; +import org.whispersystems.signalservice.internal.push.MismatchedDevices; +import org.whispersystems.signalservice.internal.push.ProofRequiredResponse; +import org.whispersystems.signalservice.internal.push.PushServiceSocket; +import org.whispersystems.signalservice.internal.push.StaleDevices; +import org.whispersystems.signalservice.internal.push.exceptions.MismatchedDevicesException; +import org.whispersystems.signalservice.internal.push.exceptions.StaleDevicesException; +import org.whispersystems.signalservice.internal.util.JsonUtil; +import org.whispersystems.signalservice.internal.util.Util; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * A default implementation of a {@link ErrorMapper} that can parse most known application + * errors. + *

+ * Can be extended to add custom error mapping via {@link #extend()}. + *

+ * While this call can be used directly, it is primarily intended to be used as part of + * {@link DefaultResponseMapper}. + */ +public final class DefaultErrorMapper implements ErrorMapper { + + private static final DefaultErrorMapper INSTANCE = new DefaultErrorMapper(); + + private final Map customErrorMappers; + + public static DefaultErrorMapper getDefault() { + return INSTANCE; + } + + public static DefaultErrorMapper.Builder extend() { + return new DefaultErrorMapper.Builder(); + } + + private DefaultErrorMapper() { + this(Collections.emptyMap()); + } + + private DefaultErrorMapper(Map customErrorMappers) { + this.customErrorMappers = customErrorMappers; + } + + public Throwable parseError(WebsocketResponse websocketResponse) { + return parseError(websocketResponse.getStatus(), websocketResponse.getBody(), websocketResponse::getHeader); + } + + @Override + public Throwable parseError(int status, String body, Function getHeader) { + if (customErrorMappers.containsKey(status)) { + return customErrorMappers.get(status).parseError(status, body, getHeader); + } + + switch (status) { + case 401: + case 403: + return new AuthorizationFailedException(status, "Authorization failed!"); + case 404: + return new NotFoundException("Not found"); + case 409: + try { + return new MismatchedDevicesException(JsonUtil.fromJsonResponse(body, MismatchedDevices.class)); + } catch (MalformedResponseException e) { + return e; + } + case 410: + try { + return new StaleDevicesException(JsonUtil.fromJsonResponse(body, StaleDevices.class)); + } catch (MalformedResponseException e) { + return e; + } + case 411: + try { + return new DeviceLimitExceededException(JsonUtil.fromJsonResponse(body, DeviceLimit.class)); + } catch (MalformedResponseException e) { + return e; + } + case 413: + return new RateLimitException("Rate limit exceeded: " + status); + case 417: + return new ExpectationFailedException(); + case 423: + PushServiceSocket.RegistrationLockFailure accountLockFailure; + try { + accountLockFailure = JsonUtil.fromJsonResponse(body, PushServiceSocket.RegistrationLockFailure.class); + } catch (MalformedResponseException e) { + return e; + } + + AuthCredentials credentials = accountLockFailure.backupCredentials; + String basicStorageCredentials = credentials != null ? credentials.asBasic() : null; + + return new LockedException(accountLockFailure.length, + accountLockFailure.timeRemaining, + basicStorageCredentials); + case 428: + ProofRequiredResponse proofRequiredResponse; + try { + proofRequiredResponse = JsonUtil.fromJsonResponse(body, ProofRequiredResponse.class); + } catch (MalformedResponseException e) { + return e; + } + String retryAfterRaw = getHeader.apply("Retry-After"); + long retryAfter = Util.parseInt(retryAfterRaw, -1); + + return new ProofRequiredException(proofRequiredResponse, retryAfter); + case 499: + return new DeprecatedVersionException(); + case 508: + return new ServerRejectedException(); + } + + if (status != 200 && status != 202 && status != 204) { + return new NonSuccessfulResponseCodeException(status, "Bad response: " + status); + } + + return null; + } + + public static class Builder { + private final Map customErrorMappers = new HashMap<>(); + + public Builder withCustom(int status, ErrorMapper errorMapper) { + customErrorMappers.put(status, errorMapper); + return this; + } + + public ErrorMapper build() { + return new DefaultErrorMapper(customErrorMappers); + } + } +} diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/DefaultResponseMapper.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/DefaultResponseMapper.java new file mode 100644 index 0000000000..e622be739f --- /dev/null +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/DefaultResponseMapper.java @@ -0,0 +1,86 @@ +package org.whispersystems.signalservice.internal.websocket; + +import org.whispersystems.libsignal.util.guava.Function; +import org.whispersystems.signalservice.api.push.exceptions.MalformedResponseException; +import org.whispersystems.signalservice.internal.ServiceResponse; +import org.whispersystems.signalservice.internal.util.JsonUtil; + +import java.util.Objects; + +/** + * A default implementation of a {@link ResponseMapper} that can parse most known + * application errors via {@link DefaultErrorMapper} and provides basic JSON parsing of the + * response model if possible. + *

+ * Can be extended to add custom parsing for both the result type and the error cases. + *

+ * See {@link #extend(Class)} and {@link DefaultErrorMapper#extend()}. + */ +public class DefaultResponseMapper implements ResponseMapper { + + private final Class clazz; + private final ErrorMapper errorMapper; + private final CustomResponseMapper customResponseMapper; + + public static DefaultResponseMapper getDefault(Class clazz) { + return new DefaultResponseMapper<>(clazz); + } + + public static DefaultResponseMapper.Builder extend(Class clazz) { + return new DefaultResponseMapper.Builder<>(clazz); + } + + private DefaultResponseMapper(Class clazz) { + this(clazz, null, DefaultErrorMapper.getDefault()); + } + + private DefaultResponseMapper(Class clazz, CustomResponseMapper customResponseMapper, ErrorMapper errorMapper) { + this.clazz = clazz; + this.customResponseMapper = customResponseMapper; + this.errorMapper = errorMapper; + } + + @Override + public ServiceResponse map(int status, String body, Function getHeader) { + Throwable applicationError = errorMapper.parseError(status, body, getHeader); + if (applicationError == null) { + try { + if (customResponseMapper != null) { + return Objects.requireNonNull(customResponseMapper.map(status, body, getHeader)); + } + return ServiceResponse.forResult(JsonUtil.fromJsonResponse(body, clazz), status, body); + } catch (MalformedResponseException e) { + applicationError = e; + } + } + return ServiceResponse.forApplicationError(applicationError, status, body); + } + + public static class Builder { + private final Class clazz; + private DefaultErrorMapper.Builder errorMapperBuilder = DefaultErrorMapper.extend(); + private CustomResponseMapper customResponseMapper; + + public Builder(Class clazz) { + this.clazz = clazz; + } + + public Builder withResponseMapper(CustomResponseMapper responseMapper) { + this.customResponseMapper = responseMapper; + return this; + } + + public Builder withCustomError(int status, ErrorMapper errorMapper) { + errorMapperBuilder.withCustom(status, errorMapper); + return this; + } + + public ResponseMapper build() { + return new DefaultResponseMapper<>(clazz, customResponseMapper, errorMapperBuilder.build()); + } + } + + public interface CustomResponseMapper { + ServiceResponse map(int status, String body, Function getHeader) throws MalformedResponseException; + } +} diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/ErrorMapper.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/ErrorMapper.java new file mode 100644 index 0000000000..344d320e64 --- /dev/null +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/ErrorMapper.java @@ -0,0 +1,13 @@ +package org.whispersystems.signalservice.internal.websocket; + +import org.whispersystems.libsignal.util.guava.Function; + +/** + * Can map an API response to an appropriate {@link Throwable}. + *

+ * Unless you need to do something really special, you should only be implementing this to customize + * {@link DefaultErrorMapper}. + */ +public interface ErrorMapper { + Throwable parseError(int status, String body, Function getHeader); +} diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/ResponseMapper.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/ResponseMapper.java new file mode 100644 index 0000000000..0b202ae65b --- /dev/null +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/ResponseMapper.java @@ -0,0 +1,23 @@ +package org.whispersystems.signalservice.internal.websocket; + +import org.whispersystems.libsignal.util.guava.Function; +import org.whispersystems.signalservice.internal.ServiceResponse; + +/** + * Responsible for taking an API response and converting it to a {@link ServiceResponse}. This includes + * parsing for a success as well as any application errors. All errors (application or parsing related) + * are encapsulated in an error version of a {@link ServiceResponse}, hence why no method throws an + * exception. + *

+ * Unless you need to do something really special, you should only be extending this to be provided to + * {@link DefaultResponseMapper}. + * + * @param - The final type the API response will map into. + */ +public interface ResponseMapper { + ServiceResponse map(int status, String body, Function getHeader); + + default ServiceResponse map(WebsocketResponse response) { + return map(response.getStatus(), response.getBody(), response::getHeader); + } +} diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/WebSocketConnection.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/WebSocketConnection.java index f52d51d050..d8c05d22b6 100644 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/WebSocketConnection.java +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/WebSocketConnection.java @@ -12,10 +12,9 @@ import org.whispersystems.signalservice.api.util.Tls12SocketFactory; import org.whispersystems.signalservice.api.util.TlsProxySocketFactory; import org.whispersystems.signalservice.api.websocket.ConnectivityListener; import org.whispersystems.signalservice.internal.configuration.SignalProxy; +import org.whispersystems.signalservice.internal.configuration.SignalServiceConfiguration; import org.whispersystems.signalservice.internal.util.BlacklistingTrustManager; import org.whispersystems.signalservice.internal.util.Util; -import org.whispersystems.signalservice.internal.util.concurrent.ListenableFuture; -import org.whispersystems.signalservice.internal.util.concurrent.SettableFuture; import java.io.IOException; import java.security.KeyManagementException; @@ -25,7 +24,6 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -35,6 +33,9 @@ import javax.net.ssl.SSLSocketFactory; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.schedulers.Schedulers; +import io.reactivex.rxjava3.subjects.SingleSubject; import okhttp3.ConnectionSpec; import okhttp3.Dns; import okhttp3.Interceptor; @@ -54,53 +55,55 @@ public class WebSocketConnection extends WebSocketListener { private static final String TAG = WebSocketConnection.class.getSimpleName(); private static final int KEEPALIVE_TIMEOUT_SECONDS = 55; - private final LinkedList incomingRequests = new LinkedList<>(); - private final Map outgoingRequests = new HashMap<>(); + private final LinkedList incomingRequests = new LinkedList<>(); + private final Map outgoingRequests = new HashMap<>(); + private final String name; private final String wsUri; private final TrustStore trustStore; private final Optional credentialsProvider; private final String signalAgent; - private final ConnectivityListener listener; + private ConnectivityListener listener; private final SleepTimer sleepTimer; private final List interceptors; private final Optional dns; private final Optional signalProxy; - private WebSocket client; - private KeepAliveSender keepAliveSender; - private int attempts; - private boolean connected; + private WebSocket client; + private KeepAliveSender keepAliveSender; + private int attempts; + private boolean connected; - public WebSocketConnection(String httpUri, - TrustStore trustStore, + public WebSocketConnection(String name, + SignalServiceConfiguration serviceConfiguration, Optional credentialsProvider, String signalAgent, ConnectivityListener listener, - SleepTimer timer, - List interceptors, - Optional dns, - Optional signalProxy) + SleepTimer timer) { - this.trustStore = trustStore; + this.name = "[" + name + ":" + System.identityHashCode(this) + "]"; + this.trustStore = serviceConfiguration.getSignalServiceUrls()[0].getTrustStore(); this.credentialsProvider = credentialsProvider; this.signalAgent = signalAgent; this.listener = listener; this.sleepTimer = timer; - this.interceptors = interceptors; - this.dns = dns; - this.signalProxy = signalProxy; + this.interceptors = serviceConfiguration.getNetworkInterceptors(); + this.dns = serviceConfiguration.getDns(); + this.signalProxy = serviceConfiguration.getSignalProxy(); this.attempts = 0; this.connected = false; - String uri = httpUri.replace("https://", "wss://").replace("http://", "ws://"); + String uri = serviceConfiguration.getSignalServiceUrls()[0].getUrl().replace("https://", "wss://").replace("http://", "ws://"); - if (credentialsProvider.isPresent()) this.wsUri = uri + "/v1/websocket/?login=%s&password=%s"; - else this.wsUri = uri + "/v1/websocket/"; + if (credentialsProvider.isPresent()) { + this.wsUri = uri + "/v1/websocket/?login=%s&password=%s"; + } else { + this.wsUri = uri + "/v1/websocket/"; + } } public synchronized void connect() { - Log.i(TAG, "connect()"); + log("connect()"); if (client == null) { String filledUri; @@ -146,8 +149,12 @@ public class WebSocketConnection extends WebSocketListener { } } + public synchronized boolean isDead() { + return client == null; + } + public synchronized void disconnect() { - Log.i(TAG, "disconnect()"); + log("disconnect()"); if (client != null) { client.close(1000, "OK"); @@ -160,6 +167,11 @@ public class WebSocketConnection extends WebSocketListener { keepAliveSender = null; } + if (listener != null) { + listener.onDisconnected(); + listener = null; + } + notifyAll(); } @@ -176,27 +188,36 @@ public class WebSocketConnection extends WebSocketListener { Util.wait(this, Math.max(1, timeoutMillis - elapsedTime(startTime))); } - if (incomingRequests.isEmpty() && client == null) throw new IOException("Connection closed!"); - else if (incomingRequests.isEmpty()) throw new TimeoutException("Timeout exceeded"); - else return incomingRequests.removeFirst(); + if (incomingRequests.isEmpty() && client == null) { + throw new IOException("Connection closed!"); + } else if (incomingRequests.isEmpty()) { + throw new TimeoutException("Timeout exceeded"); + } else { + return incomingRequests.removeFirst(); + } } - public synchronized ListenableFuture sendRequest(WebSocketRequestMessage request) throws IOException { - if (client == null || !connected) throw new IOException("No connection!"); + public synchronized Single sendRequest(WebSocketRequestMessage request) throws IOException { + if (client == null || !connected) { + throw new IOException("No connection!"); + } WebSocketMessage message = WebSocketMessage.newBuilder() .setType(WebSocketMessage.Type.REQUEST) .setRequest(request) .build(); - SettableFuture future = new SettableFuture<>(); - outgoingRequests.put(request.getId(), new OutgoingRequest(future, System.currentTimeMillis())); + SingleSubject single = SingleSubject.create(); + + outgoingRequests.put(request.getId(), new OutgoingRequest(single)); if (!client.send(ByteString.of(message.toByteArray()))) { throw new IOException("Write failed!"); } - return future; + return single.subscribeOn(Schedulers.io()) + .observeOn(Schedulers.io()) + .timeout(10, TimeUnit.SECONDS); } public synchronized void sendResponse(WebSocketResponseMessage response) throws IOException { @@ -234,13 +255,15 @@ public class WebSocketConnection extends WebSocketListener { @Override public synchronized void onOpen(WebSocket webSocket, Response response) { if (client != null && keepAliveSender == null) { - Log.i(TAG, "onOpen() connected"); + log("onOpen() connected"); attempts = 0; connected = true; keepAliveSender = new KeepAliveSender(); keepAliveSender.start(); - if (listener != null) listener.onConnected(); + if (listener != null) { + listener.onConnected(); + } } } @@ -249,33 +272,33 @@ public class WebSocketConnection extends WebSocketListener { try { WebSocketMessage message = WebSocketMessage.parseFrom(payload.toByteArray()); - if (message.getType().getNumber() == WebSocketMessage.Type.REQUEST_VALUE) { + if (message.getType().getNumber() == WebSocketMessage.Type.REQUEST_VALUE) { incomingRequests.add(message.getRequest()); } else if (message.getType().getNumber() == WebSocketMessage.Type.RESPONSE_VALUE) { - OutgoingRequest listener = outgoingRequests.get(message.getResponse().getId()); + OutgoingRequest listener = outgoingRequests.remove(message.getResponse().getId()); if (listener != null) { - listener.getResponseFuture().set(new WebsocketResponse(message.getResponse().getStatus(), - new String(message.getResponse().getBody().toByteArray()), - message.getResponse().getHeadersList())); + listener.onSuccess(new WebsocketResponse(message.getResponse().getStatus(), + new String(message.getResponse().getBody().toByteArray()), + message.getResponse().getHeadersList())); } } notifyAll(); } catch (InvalidProtocolBufferException e) { - Log.w(TAG, e); + warn(e); } } @Override public synchronized void onClosed(WebSocket webSocket, int code, String reason) { - Log.i(TAG, "onClose()"); + log("onClose()"); this.connected = false; Iterator> iterator = outgoingRequests.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry entry = iterator.next(); - entry.getValue().getResponseFuture().setException(new IOException("Closed: " + code + ", " + reason)); + entry.getValue().onError(new IOException("Closed: " + code + ", " + reason)); iterator.remove(); } @@ -291,6 +314,7 @@ public class WebSocketConnection extends WebSocketListener { Util.wait(this, Math.min(++attempts * 200, TimeUnit.SECONDS.toMillis(15))); if (client != null) { + log("Client not null when closed, attempting to reconnect"); client.close(1000, "OK"); client = null; connected = false; @@ -302,7 +326,7 @@ public class WebSocketConnection extends WebSocketListener { @Override public synchronized void onFailure(WebSocket webSocket, Throwable t, Response response) { - Log.w(TAG, "onFailure()", t); + warn("onFailure()", t); if (response != null && (response.code() == 401 || response.code() == 403)) { if (listener != null) { @@ -311,7 +335,7 @@ public class WebSocketConnection extends WebSocketListener { } else if (listener != null) { boolean shouldRetryConnection = listener.onGenericFailure(response, t); if (!shouldRetryConnection) { - Log.w(TAG, "Experienced a failure, and the listener indicated we should not retry the connection. Disconnecting."); + warn("Experienced a failure, and the listener indicated we should not retry the connection. Disconnecting."); disconnect(); } } @@ -328,7 +352,7 @@ public class WebSocketConnection extends WebSocketListener { @Override public synchronized void onClosing(WebSocket webSocket, int code, String reason) { - Log.i(TAG, "onClosing()"); + log("onClosing()"); webSocket.close(1000, "OK"); } @@ -342,25 +366,45 @@ public class WebSocketConnection extends WebSocketListener { TrustManager[] trustManagers = BlacklistingTrustManager.createFor(trustStore); context.init(null, trustManagers, null); - return new Pair<>(context.getSocketFactory(), (X509TrustManager)trustManagers[0]); + return new Pair<>(context.getSocketFactory(), (X509TrustManager) trustManagers[0]); } catch (NoSuchAlgorithmException | KeyManagementException e) { throw new AssertionError(e); } } + private void log(String message) { + Log.i(TAG, name + " " + message); + } + + @SuppressWarnings("SameParameterValue") + private void warn(String message) { + Log.w(TAG, name + " " + message); + } + + private void warn(Throwable e) { + Log.w(TAG, name, e); + } + + @SuppressWarnings("SameParameterValue") + private void warn(String message, Throwable e) { + Log.w(TAG, name + " " + message, e); + } + private class KeepAliveSender extends Thread { - private AtomicBoolean stop = new AtomicBoolean(false); + private final AtomicBoolean stop = new AtomicBoolean(false); public void run() { while (!stop.get()) { try { sleepTimer.sleep(TimeUnit.SECONDS.toMillis(KEEPALIVE_TIMEOUT_SECONDS)); - Log.d(TAG, "Sending keep alive..."); - sendKeepAlive(); + if (!stop.get()) { + log("Sending keep alive..."); + sendKeepAlive(); + } } catch (Throwable e) { - Log.w(TAG, e); + warn(e); } } } @@ -371,20 +415,18 @@ public class WebSocketConnection extends WebSocketListener { } private static class OutgoingRequest { - private final SettableFuture responseFuture; - private final long startTimestamp; + private final SingleSubject responseSingle; - private OutgoingRequest(SettableFuture future, long startTimestamp) { - this.responseFuture = future; - this.startTimestamp = startTimestamp; + private OutgoingRequest(SingleSubject future) { + this.responseSingle = future; } - SettableFuture getResponseFuture() { - return responseFuture; + public void onSuccess(WebsocketResponse response) { + responseSingle.onSuccess(response); } - long getStartTimestamp() { - return startTimestamp; + public void onError(Throwable throwable) { + responseSingle.onError(throwable); } } } diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/WebSocketEventListener.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/WebSocketEventListener.java deleted file mode 100644 index 65953e32a9..0000000000 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/WebSocketEventListener.java +++ /dev/null @@ -1,9 +0,0 @@ -package org.whispersystems.signalservice.internal.websocket; - -public interface WebSocketEventListener { - - public void onMessage(byte[] payload); - public void onClose(); - public void onConnected(); - -}