Initial WebSocket refactor.

This commit is contained in:
Cody Henthorne
2021-07-09 10:01:24 -04:00
committed by Greyson Parrelli
parent 916006e664
commit 5d6d78a51e
30 changed files with 1438 additions and 869 deletions

View File

@@ -127,7 +127,7 @@ public class ApplicationContext extends MultiDexApplication implements AppForegr
})
.addBlocking("crash-handling", this::initializeCrashHandling)
.addBlocking("rx-init", () -> {
RxJavaPlugins.setInitIoSchedulerHandler(schedulerSupplier -> Schedulers.from(SignalExecutors.UNBOUNDED, true, false));
RxJavaPlugins.setInitIoSchedulerHandler(schedulerSupplier -> Schedulers.from(SignalExecutors.BOUNDED_IO, true, false));
RxJavaPlugins.setInitComputationSchedulerHandler(schedulerSupplier -> Schedulers.from(SignalExecutors.BOUNDED, true, false));
})
.addBlocking("eat-db", () -> DatabaseFactory.getInstance(this))

View File

@@ -53,8 +53,9 @@ import org.whispersystems.libsignal.util.guava.Optional;
import org.whispersystems.signalservice.api.profiles.ProfileAndCredential;
import org.whispersystems.signalservice.api.profiles.SignalServiceProfile;
import org.whispersystems.signalservice.api.push.exceptions.NotFoundException;
import org.whispersystems.signalservice.api.services.ProfileService;
import org.whispersystems.signalservice.api.util.UuidUtil;
import org.whispersystems.signalservice.internal.util.concurrent.ListenableFuture;
import org.whispersystems.signalservice.internal.ServiceResponse;
import java.io.IOException;
import java.util.Calendar;
@@ -66,9 +67,10 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;
/**
* Manages all the stuff around determining if a user is registered or not.
@@ -131,7 +133,7 @@ public class DirectoryHelper {
Stopwatch stopwatch = new Stopwatch("single");
RecipientDatabase recipientDatabase = DatabaseFactory.getRecipientDatabase(context);
RegisteredState originalRegisteredState = recipient.resolve().getRegistered();
RegisteredState newRegisteredState = null;
RegisteredState newRegisteredState;
if (recipient.hasUuid() && !recipient.hasE164()) {
boolean isRegistered = isUuidRegistered(context, recipient);
@@ -510,29 +512,34 @@ public class DirectoryHelper {
.filter(r -> hasCommunicatedWith(context, r))
.toList();
List<Pair<Recipient, ListenableFuture<ProfileAndCredential>>> futures = Stream.of(possiblyUnlisted)
.map(r -> new Pair<>(r, ProfileUtil.retrieveProfile(context, r, SignalServiceProfile.RequestType.PROFILE)))
.toList();
Set<RecipientId> potentiallyActiveIds = new HashSet<>();
Set<RecipientId> retries = new HashSet<>();
ProfileService profileService = new ProfileService(ApplicationDependencies.getGroupsV2Operations().getProfileOperations(),
ApplicationDependencies.getSignalServiceMessageReceiver(),
ApplicationDependencies.getSignalWebSocket());
Stream.of(futures)
.forEach(pair -> {
try {
pair.second().get(5, TimeUnit.SECONDS);
potentiallyActiveIds.add(pair.first().getId());
} catch (InterruptedException | TimeoutException e) {
retries.add(pair.first().getId());
potentiallyActiveIds.add(pair.first().getId());
} catch (ExecutionException e) {
if (!(e.getCause() instanceof NotFoundException)) {
retries.add(pair.first().getId());
potentiallyActiveIds.add(pair.first().getId());
}
}
});
List<Observable<Pair<Recipient, ServiceResponse<ProfileAndCredential>>>> requests = Stream.of(possiblyUnlisted)
.map(r -> ProfileUtil.retrieveProfile(context, r, SignalServiceProfile.RequestType.PROFILE, profileService)
.toObservable()
.timeout(5, TimeUnit.SECONDS)
.onErrorReturn(t -> new Pair<>(r, ServiceResponse.forUnknownError(t))))
.toList();
return new UnlistedResult(potentiallyActiveIds, retries);
return Observable.mergeDelayError(requests)
.observeOn(Schedulers.io(), true)
.scan(new UnlistedResult.Builder(), (builder, pair) -> {
Recipient recipient = pair.first();
ProfileService.ProfileResponseProcessor processor = new ProfileService.ProfileResponseProcessor(pair.second());
if (processor.hasResult()) {
builder.potentiallyActiveIds.add(recipient.getId());
} else if (processor.genericIoError() || !processor.notFound()) {
builder.retries.add(recipient.getId());
builder.potentiallyActiveIds.add(recipient.getId());
}
return builder;
})
.lastOrError()
.map(UnlistedResult.Builder::build)
.blockingGet();
}
private static boolean hasCommunicatedWith(@NonNull Context context, @NonNull Recipient recipient) {
@@ -584,6 +591,15 @@ public class DirectoryHelper {
@NonNull Set<RecipientId> getRetries() {
return retries;
}
private static class Builder {
final Set<RecipientId> potentiallyActiveIds = new HashSet<>();
final Set<RecipientId> retries = new HashSet<>();
@NonNull UnlistedResult build() {
return new UnlistedResult(potentiallyActiveIds, retries);
}
}
}
private static class AccountHolder {

View File

@@ -9,6 +9,7 @@ import org.thoughtcrime.securesms.KbsEnclave;
import org.thoughtcrime.securesms.components.TypingStatusRepository;
import org.thoughtcrime.securesms.components.TypingStatusSender;
import org.thoughtcrime.securesms.database.DatabaseObserver;
import org.thoughtcrime.securesms.database.PendingRetryReceiptCache;
import org.thoughtcrime.securesms.groups.GroupsV2Authorization;
import org.thoughtcrime.securesms.groups.GroupsV2AuthorizationMemoryValueCache;
import org.thoughtcrime.securesms.groups.v2.processing.GroupsV2StateProcessor;
@@ -18,7 +19,6 @@ import org.thoughtcrime.securesms.megaphone.MegaphoneRepository;
import org.thoughtcrime.securesms.messages.BackgroundMessageRetriever;
import org.thoughtcrime.securesms.messages.IncomingMessageObserver;
import org.thoughtcrime.securesms.messages.IncomingMessageProcessor;
import org.thoughtcrime.securesms.database.PendingRetryReceiptCache;
import org.thoughtcrime.securesms.net.PipeConnectivityListener;
import org.thoughtcrime.securesms.net.StandardUserAgentInterceptor;
import org.thoughtcrime.securesms.notifications.MessageNotifier;
@@ -41,6 +41,7 @@ import org.whispersystems.signalservice.api.KeyBackupService;
import org.whispersystems.signalservice.api.SignalServiceAccountManager;
import org.whispersystems.signalservice.api.SignalServiceMessageReceiver;
import org.whispersystems.signalservice.api.SignalServiceMessageSender;
import org.whispersystems.signalservice.api.SignalWebSocket;
import org.whispersystems.signalservice.api.groupsv2.GroupsV2Operations;
import okhttp3.OkHttpClient;
@@ -90,6 +91,7 @@ public class ApplicationDependencies {
private static volatile OkHttpClient okHttpClient;
private static volatile PendingRetryReceiptManager pendingRetryReceiptManager;
private static volatile PendingRetryReceiptCache pendingRetryReceiptCache;
private static volatile SignalWebSocket signalWebSocket;
@MainThread
public static void init(@NonNull Application application, @NonNull Provider provider) {
@@ -184,12 +186,9 @@ public class ApplicationDependencies {
synchronized (LOCK) {
if (messageSender == null) {
messageSender = provider.provideSignalServiceMessageSender();
messageSender = provider.provideSignalServiceMessageSender(getSignalWebSocket());
} else {
messageSender.update(
IncomingMessageObserver.getPipe(),
IncomingMessageObserver.getUnidentifiedPipe(),
TextSecurePreferences.isMultiDevice(application));
messageSender.update(TextSecurePreferences.isMultiDevice(application));
}
return messageSender;
}
@@ -492,12 +491,22 @@ public class ApplicationDependencies {
return pendingRetryReceiptCache;
}
public static @NonNull SignalWebSocket getSignalWebSocket() {
if (signalWebSocket == null) {
synchronized (LOCK) {
if (signalWebSocket == null) {
signalWebSocket = provider.provideSignalWebSocket();
}
}
}
return signalWebSocket;
}
public interface Provider {
@NonNull PipeConnectivityListener providePipeListener();
@NonNull GroupsV2Operations provideGroupsV2Operations();
@NonNull SignalServiceAccountManager provideSignalServiceAccountManager();
@NonNull SignalServiceMessageSender provideSignalServiceMessageSender();
@NonNull SignalServiceMessageSender provideSignalServiceMessageSender(@NonNull SignalWebSocket signalWebSocket);
@NonNull SignalServiceMessageReceiver provideSignalServiceMessageReceiver();
@NonNull SignalServiceNetworkAccess provideSignalServiceNetworkAccess();
@NonNull IncomingMessageProcessor provideIncomingMessageProcessor();
@@ -521,5 +530,6 @@ public class ApplicationDependencies {
@NonNull SignalCallManager provideSignalCallManager();
@NonNull PendingRetryReceiptManager providePendingRetryReceiptManager();
@NonNull PendingRetryReceiptCache providePendingRetryReceiptCache();
@NonNull SignalWebSocket provideSignalWebSocket();
}
}

View File

@@ -59,11 +59,14 @@ import org.whispersystems.libsignal.util.guava.Optional;
import org.whispersystems.signalservice.api.SignalServiceAccountManager;
import org.whispersystems.signalservice.api.SignalServiceMessageReceiver;
import org.whispersystems.signalservice.api.SignalServiceMessageSender;
import org.whispersystems.signalservice.api.SignalWebSocket;
import org.whispersystems.signalservice.api.groupsv2.ClientZkOperations;
import org.whispersystems.signalservice.api.groupsv2.GroupsV2Operations;
import org.whispersystems.signalservice.api.util.CredentialsProvider;
import org.whispersystems.signalservice.api.util.SleepTimer;
import org.whispersystems.signalservice.api.util.UptimeSleepTimer;
import org.whispersystems.signalservice.api.websocket.WebSocketFactory;
import org.whispersystems.signalservice.internal.websocket.WebSocketConnection;
import java.util.UUID;
@@ -106,15 +109,14 @@ public class ApplicationDependencyProvider implements ApplicationDependencies.Pr
}
@Override
public @NonNull SignalServiceMessageSender provideSignalServiceMessageSender() {
public @NonNull SignalServiceMessageSender provideSignalServiceMessageSender(@NonNull SignalWebSocket signalWebSocket) {
return new SignalServiceMessageSender(provideSignalServiceNetworkAccess().getConfiguration(context),
new DynamicCredentialsProvider(context),
new SignalProtocolStoreImpl(context),
ReentrantSessionLock.INSTANCE,
BuildConfig.SIGNAL_AGENT,
TextSecurePreferences.isMultiDevice(context),
Optional.fromNullable(IncomingMessageObserver.getPipe()),
Optional.fromNullable(IncomingMessageObserver.getUnidentifiedPipe()),
signalWebSocket,
Optional.of(new SecurityEventListener(context)),
provideClientZkOperations().getProfileOperations(),
SignalExecutors.newCachedBoundedExecutor("signal-messages", 1, 16),
@@ -261,6 +263,41 @@ public class ApplicationDependencyProvider implements ApplicationDependencies.Pr
return new PendingRetryReceiptCache(context);
}
@Override
public @NonNull SignalWebSocket provideSignalWebSocket() {
return new SignalWebSocket(provideWebSocketFactory());
}
private @NonNull WebSocketFactory provideWebSocketFactory() {
return new WebSocketFactory() {
@Override
public WebSocketConnection createWebSocket() {
SleepTimer sleepTimer = TextSecurePreferences.isFcmDisabled(context) ? new AlarmSleepTimer(context)
: new UptimeSleepTimer();
return new WebSocketConnection("normal",
provideSignalServiceNetworkAccess().getConfiguration(context),
Optional.of(new DynamicCredentialsProvider(context)),
BuildConfig.SIGNAL_AGENT,
pipeListener,
sleepTimer);
}
@Override
public WebSocketConnection createUnidentifiedWebSocket() {
SleepTimer sleepTimer = TextSecurePreferences.isFcmDisabled(context) ? new AlarmSleepTimer(context)
: new UptimeSleepTimer();
return new WebSocketConnection("unidentified",
provideSignalServiceNetworkAccess().getConfiguration(context),
Optional.absent(),
BuildConfig.SIGNAL_AGENT,
pipeListener,
sleepTimer);
}
};
}
private static class DynamicCredentialsProvider implements CredentialsProvider {
private final Context context;

View File

@@ -46,9 +46,8 @@ import org.whispersystems.signalservice.api.crypto.InvalidCiphertextException;
import org.whispersystems.signalservice.api.crypto.ProfileCipher;
import org.whispersystems.signalservice.api.profiles.ProfileAndCredential;
import org.whispersystems.signalservice.api.profiles.SignalServiceProfile;
import org.whispersystems.signalservice.api.push.exceptions.NotFoundException;
import org.whispersystems.signalservice.api.push.exceptions.PushNetworkException;
import org.whispersystems.signalservice.internal.util.concurrent.ListenableFuture;
import org.whispersystems.signalservice.api.services.ProfileService;
import org.whispersystems.signalservice.internal.ServiceResponse;
import java.io.IOException;
import java.util.ArrayList;
@@ -59,9 +58,10 @@ import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;
/**
* Retrieves a users profile and sets the appropriate local fields.
@@ -86,7 +86,7 @@ public class RetrieveProfileJob extends BaseJob {
/**
* Submits the necessary job to refresh the profile of the requested recipient. Works for any
* RecipientId, including individuals, groups, or yourself.
*
* <p>
* Identical to {@link #enqueue(Set)})}
*/
@WorkerThread
@@ -169,7 +169,7 @@ public class RetrieveProfileJob extends BaseJob {
*/
public static void enqueueRoutineFetchIfNecessary(Application application) {
if (!SignalStore.registrationValues().isRegistrationComplete() ||
!TextSecurePreferences.isPushRegistered(application) ||
!TextSecurePreferences.isPushRegistered(application) ||
TextSecurePreferences.getLocalUuid(application) == null)
{
Log.i(TAG, "Registration not complete. Skipping.");
@@ -204,10 +204,9 @@ public class RetrieveProfileJob extends BaseJob {
}
private RetrieveProfileJob(@NonNull Set<RecipientId> recipientIds) {
this(new Job.Parameters.Builder()
.addConstraint(NetworkConstraint.KEY)
.setMaxAttempts(3)
.build(),
this(new Job.Parameters.Builder().addConstraint(NetworkConstraint.KEY)
.setMaxAttempts(3)
.build(),
recipientIds);
}
@@ -218,11 +217,10 @@ public class RetrieveProfileJob extends BaseJob {
@Override
public @NonNull Data serialize() {
return new Data.Builder()
.putStringListAsArray(KEY_RECIPIENTS, Stream.of(recipientIds)
.map(RecipientId::serialize)
.toList())
.build();
return new Data.Builder().putStringListAsArray(KEY_RECIPIENTS, Stream.of(recipientIds)
.map(RecipientId::serialize)
.toList())
.build();
}
@Override
@@ -244,8 +242,6 @@ public class RetrieveProfileJob extends BaseJob {
Stopwatch stopwatch = new Stopwatch("RetrieveProfile");
RecipientDatabase recipientDatabase = DatabaseFactory.getRecipientDatabase(context);
Set<RecipientId> retries = new HashSet<>();
Set<RecipientId> unregistered = new HashSet<>();
RecipientUtil.ensureUuidsAreAvailable(context, Stream.of(Recipient.resolvedList(recipientIds))
.filter(r -> r.getRegistered() != RecipientDatabase.RegisteredState.NOT_REGISTERED)
@@ -254,66 +250,64 @@ public class RetrieveProfileJob extends BaseJob {
List<Recipient> recipients = Recipient.resolvedList(recipientIds);
stopwatch.split("resolve-ensure");
List<Pair<Recipient, ListenableFuture<ProfileAndCredential>>> futures = Stream.of(recipients)
.filter(Recipient::hasServiceIdentifier)
.map(r -> new Pair<>(r, ProfileUtil.retrieveProfile(context, r, getRequestType(r))))
.toList();
stopwatch.split("futures");
ProfileService profileService = new ProfileService(ApplicationDependencies.getGroupsV2Operations().getProfileOperations(),
ApplicationDependencies.getSignalServiceMessageReceiver(),
ApplicationDependencies.getSignalWebSocket());
List<Pair<Recipient, ProfileAndCredential>> profiles = Stream.of(futures)
.map(pair -> {
Recipient recipient = pair.first();
List<Observable<Pair<Recipient, ServiceResponse<ProfileAndCredential>>>> requests = Stream.of(recipients)
.filter(Recipient::hasServiceIdentifier)
.map(r -> ProfileUtil.retrieveProfile(context, r, getRequestType(r), profileService).toObservable())
.toList();
stopwatch.split("requests");
try {
ProfileAndCredential profile = pair.second().get(10, TimeUnit.SECONDS);
return new Pair<>(recipient, profile);
} catch (InterruptedException | TimeoutException e) {
retries.add(recipient.getId());
} catch (ExecutionException e) {
if (e.getCause() instanceof PushNetworkException) {
retries.add(recipient.getId());
} else if (e.getCause() instanceof NotFoundException) {
Log.w(TAG, "Failed to find a profile for " + recipient.getId());
if (recipient.isRegistered()) {
unregistered.add(recipient.getId());
}
} else {
Log.w(TAG, "Failed to retrieve profile for " + recipient.getId());
}
}
return null;
})
.withoutNulls()
.toList();
stopwatch.split("network");
OperationState operationState = Observable.mergeDelayError(requests)
.observeOn(Schedulers.io(), true)
.scan(new OperationState(), (state, pair) -> {
Recipient recipient = pair.first();
ProfileService.ProfileResponseProcessor processor = new ProfileService.ProfileResponseProcessor(pair.second());
if (processor.hasResult()) {
state.profiles.add(processor.getResult(recipient));
process(recipient, processor.getResult());
} else if (processor.notFound()) {
Log.w(TAG, "Failed to find a profile for " + recipient.getId());
if (recipient.isRegistered()) {
state.unregistered.add(recipient.getId());
}
} else if (processor.genericIoError()) {
state.retries.add(recipient.getId());
} else {
Log.w(TAG, "Failed to retrieve profile for " + recipient.getId());
}
return state;
})
.lastOrError()
.blockingGet();
for (Pair<Recipient, ProfileAndCredential> profile : profiles) {
process(profile.first(), profile.second());
}
stopwatch.split("network-process");
Set<RecipientId> success = SetUtil.difference(recipientIds, retries);
Set<RecipientId> success = SetUtil.difference(recipientIds, operationState.retries);
recipientDatabase.markProfilesFetched(success, System.currentTimeMillis());
Map<RecipientId, String> newlyRegistered = Stream.of(profiles)
Map<RecipientId, String> newlyRegistered = Stream.of(operationState.profiles)
.map(Pair::first)
.filterNot(Recipient::isRegistered)
.collect(Collectors.toMap(Recipient::getId,
r -> r.getUuid().transform(UUID::toString).orNull()));
r -> r.getUuid().transform(UUID::toString).orNull()));
if (unregistered.size() > 0 || newlyRegistered.size() > 0) {
Log.i(TAG, "Marking " + newlyRegistered.size() + " users as registered and " + unregistered.size() + " users as unregistered.");
recipientDatabase.bulkUpdatedRegisteredStatus(newlyRegistered, unregistered);
if (operationState.unregistered.size() > 0 || newlyRegistered.size() > 0) {
Log.i(TAG, "Marking " + newlyRegistered.size() + " users as registered and " + operationState.unregistered.size() + " users as unregistered.");
recipientDatabase.bulkUpdatedRegisteredStatus(newlyRegistered, operationState.unregistered);
}
stopwatch.split("process");
long keyCount = Stream.of(profiles).map(Pair::first).map(Recipient::getProfileKey).withoutNulls().count();
Log.d(TAG, String.format(Locale.US, "Started with %d recipient(s). Found %d profile(s), and had keys for %d of them. Will retry %d.", recipients.size(), profiles.size(), keyCount, retries.size()));
long keyCount = Stream.of(operationState.profiles).map(Pair::first).map(Recipient::getProfileKey).withoutNulls().count();
Log.d(TAG, String.format(Locale.US, "Started with %d recipient(s). Found %d profile(s), and had keys for %d of them. Will retry %d.", recipients.size(), operationState.profiles.size(), keyCount, operationState.retries.size()));
stopwatch.stop(TAG);
recipientIds.clear();
recipientIds.addAll(retries);
recipientIds.addAll(operationState.retries);
if (recipientIds.size() > 0) {
throw new RetryLaterException();
@@ -329,8 +323,8 @@ public class RetrieveProfileJob extends BaseJob {
public void onFailure() {}
private void process(Recipient recipient, ProfileAndCredential profileAndCredential) {
SignalServiceProfile profile = profileAndCredential.getProfile();
ProfileKey recipientProfileKey = ProfileKeyUtil.profileKeyOrNull(recipient.getProfileKey());
SignalServiceProfile profile = profileAndCredential.getProfile();
ProfileKey recipientProfileKey = ProfileKeyUtil.profileKeyOrNull(recipient.getProfileKey());
setProfileName(recipient, profile.getName());
setProfileAbout(recipient, profile.getAbout(), profile.getAboutEmoji());
@@ -401,7 +395,7 @@ public class RetrieveProfileJob extends BaseJob {
}
} else {
ProfileCipher profileCipher = new ProfileCipher(profileKey);
boolean verifiedUnidentifiedAccess;
boolean verifiedUnidentifiedAccess;
try {
verifiedUnidentifiedAccess = profileCipher.verifyUnidentifiedAccess(Base64.decode(unidentifiedAccessVerifier));
@@ -436,9 +430,9 @@ public class RetrieveProfileJob extends BaseJob {
String remoteDisplayName = remoteProfileName.toString();
String localDisplayName = localProfileName.toString();
if (!recipient.isBlocked() &&
!recipient.isGroup() &&
!recipient.isSelf() &&
if (!recipient.isBlocked() &&
!recipient.isGroup() &&
!recipient.isSelf() &&
!localDisplayName.isEmpty() &&
!remoteDisplayName.equals(localDisplayName))
{
@@ -446,7 +440,7 @@ public class RetrieveProfileJob extends BaseJob {
DatabaseFactory.getSmsDatabase(context).insertProfileNameChangeMessages(recipient, remoteDisplayName, localDisplayName);
} else {
Log.i(TAG, String.format(Locale.US, "Name changed, but wasn't relevant to write an event. blocked: %s, group: %s, self: %s, firstSet: %s, displayChange: %s",
recipient.isBlocked(), recipient.isGroup(), recipient.isSelf(), localDisplayName.isEmpty(), !remoteDisplayName.equals(localDisplayName)));
recipient.isBlocked(), recipient.isGroup(), recipient.isSelf(), localDisplayName.isEmpty(), !remoteDisplayName.equals(localDisplayName)));
}
}
@@ -494,6 +488,15 @@ public class RetrieveProfileJob extends BaseJob {
DatabaseFactory.getRecipientDatabase(context).setCapabilities(recipient.getId(), capabilities);
}
/**
* Collective state as responses are processed as they come in.
*/
private static class OperationState {
final Set<RecipientId> retries = new HashSet<>();
final Set<RecipientId> unregistered = new HashSet<>();
final List<Pair<Recipient, ProfileAndCredential>> profiles = new ArrayList<>();
}
public static final class Factory implements Job.Factory<RetrieveProfileJob> {
@Override

View File

@@ -26,11 +26,10 @@ import org.thoughtcrime.securesms.notifications.NotificationChannels;
import org.thoughtcrime.securesms.push.SignalServiceNetworkAccess;
import org.thoughtcrime.securesms.util.AppForegroundObserver;
import org.thoughtcrime.securesms.util.TextSecurePreferences;
import org.whispersystems.libsignal.InvalidVersionException;
import org.whispersystems.libsignal.util.guava.Optional;
import org.whispersystems.signalservice.api.SignalServiceMessagePipe;
import org.whispersystems.signalservice.api.SignalServiceMessageReceiver;
import org.whispersystems.signalservice.api.SignalWebSocket;
import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException;
import java.util.ArrayList;
import java.util.List;
@@ -48,9 +47,6 @@ public class IncomingMessageObserver {
private static final AtomicInteger INSTANCE_COUNT = new AtomicInteger(0);
private static volatile SignalServiceMessagePipe pipe = null;
private static volatile SignalServiceMessagePipe unidentifiedPipe = null;
private final Application context;
private final SignalServiceNetworkAccess networkAccess;
private final List<Runnable> decryptionDrainedListeners;
@@ -96,7 +92,7 @@ public class IncomingMessageObserver {
Log.w(TAG, "Lost network connection. Shutting down our websocket connections and resetting the drained state.");
networkDrained = false;
decryptionDrained = false;
shutdown(pipe, unidentifiedPipe);
shutdown();
}
IncomingMessageObserver.this.notifyAll();
}
@@ -176,40 +172,12 @@ public class IncomingMessageObserver {
SignalExecutors.BOUNDED.execute(() -> {
Log.w(TAG, "Beginning termination.");
terminated = true;
shutdown(pipe, unidentifiedPipe);
shutdown();
});
}
private void shutdown(@Nullable SignalServiceMessagePipe pipe, @Nullable SignalServiceMessagePipe unidentifiedPipe) {
try {
if (pipe != null) {
Log.w(TAG, "Shutting down normal pipe.");
pipe.shutdown();
} else {
Log.w(TAG, "No need to shutdown normal pipe, it doesn't exist.");
}
} catch (Throwable t) {
Log.w(TAG, "Closing normal pipe failed!", t);
}
try {
if (unidentifiedPipe != null) {
Log.w(TAG, "Shutting down unidentified pipe.");
unidentifiedPipe.shutdown();
} else {
Log.w(TAG, "No need to shutdown unidentified pipe, it doesn't exist.");
}
} catch (Throwable t) {
Log.w(TAG, "Closing unidentified pipe failed!", t);
}
}
public static @Nullable SignalServiceMessagePipe getPipe() {
return pipe;
}
public static @Nullable SignalServiceMessagePipe getUnidentifiedPipe() {
return unidentifiedPipe;
private void shutdown() {
ApplicationDependencies.getSignalWebSocket().disconnect();
}
private class MessageRetrievalThread extends Thread implements Thread.UncaughtExceptionHandler {
@@ -227,19 +195,14 @@ public class IncomingMessageObserver {
waitForConnectionNecessary();
Log.i(TAG, "Making websocket connection....");
SignalServiceMessageReceiver receiver = ApplicationDependencies.getSignalServiceMessageReceiver();
pipe = receiver.createMessagePipe();
unidentifiedPipe = receiver.createUnidentifiedMessagePipe();
SignalServiceMessagePipe localPipe = pipe;
SignalServiceMessagePipe unidentifiedLocalPipe = unidentifiedPipe;
SignalWebSocket signalWebSocket = ApplicationDependencies.getSignalWebSocket();
signalWebSocket.connect();
try {
while (isConnectionNecessary()) {
try {
Log.d(TAG, "Reading message...");
Optional<SignalServiceEnvelope> result = localPipe.readOrEmpty(REQUEST_TIMEOUT_MINUTES, TimeUnit.MINUTES, envelope -> {
Optional<SignalServiceEnvelope> result = signalWebSocket.readOrEmpty(TimeUnit.MINUTES.toMillis(REQUEST_TIMEOUT_MINUTES), envelope -> {
Log.i(TAG, "Retrieved envelope! " + envelope.getTimestamp());
try (Processor processor = ApplicationDependencies.getIncomingMessageProcessor().acquire()) {
processor.processEnvelope(envelope);
@@ -251,6 +214,9 @@ public class IncomingMessageObserver {
networkDrained = true;
ApplicationDependencies.getJobManager().add(new PushDecryptDrainedJob());
}
} catch (WebSocketUnavailableException e) {
Log.i(TAG, "Pipe unexpectedly unavailable, connecting");
signalWebSocket.connect();
} catch (TimeoutException e) {
Log.w(TAG, "Application level read timeout...");
}
@@ -259,7 +225,7 @@ public class IncomingMessageObserver {
Log.w(TAG, e);
} finally {
Log.w(TAG, "Shutting down pipe...");
shutdown(localPipe, unidentifiedLocalPipe);
shutdown();
}
Log.i(TAG, "Looping...");

View File

@@ -6,28 +6,25 @@ import org.signal.core.util.logging.Log;
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.jobmanager.JobManager;
import org.thoughtcrime.securesms.jobs.PushProcessMessageJob;
import org.whispersystems.libsignal.InvalidVersionException;
import org.whispersystems.libsignal.util.guava.Optional;
import org.whispersystems.signalservice.api.SignalServiceMessagePipe;
import org.whispersystems.signalservice.api.SignalServiceMessageReceiver;
import org.whispersystems.signalservice.api.SignalWebSocket;
import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
import java.io.IOException;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
class WebsocketStrategy extends MessageRetrievalStrategy {
private static final String TAG = Log.tag(WebsocketStrategy.class);
private final SignalServiceMessageReceiver receiver;
private final JobManager jobManager;
private final SignalWebSocket signalWebSocket;
private final JobManager jobManager;
public WebsocketStrategy() {
this.receiver = ApplicationDependencies.getSignalServiceMessageReceiver();
this.jobManager = ApplicationDependencies.getJobManager();
this.signalWebSocket = ApplicationDependencies.getSignalWebSocket();
this.jobManager = ApplicationDependencies.getJobManager();
}
@Override
@@ -55,15 +52,15 @@ class WebsocketStrategy extends MessageRetrievalStrategy {
}
private @NonNull Set<String> drainWebsocket(long timeout, long startTime) throws IOException {
SignalServiceMessagePipe pipe = receiver.createMessagePipe();
QueueFindingJobListener queueListener = new QueueFindingJobListener();
jobManager.addListener(job -> job.getParameters().getQueue() != null && job.getParameters().getQueue().startsWith(PushProcessMessageJob.QUEUE_PREFIX), queueListener);
try {
signalWebSocket.connect();
while (shouldContinue()) {
try {
Optional<SignalServiceEnvelope> result = pipe.readOrEmpty(timeout, TimeUnit.MILLISECONDS, envelope -> {
Optional<SignalServiceEnvelope> result = signalWebSocket.readOrEmpty(timeout, envelope -> {
Log.i(TAG, "Retrieved envelope! " + envelope.getTimestamp() + timeSuffix(startTime));
try (IncomingMessageProcessor.Processor processor = ApplicationDependencies.getIncomingMessageProcessor().acquire()) {
processor.processEnvelope(envelope);
@@ -79,7 +76,7 @@ class WebsocketStrategy extends MessageRetrievalStrategy {
}
}
} finally {
pipe.shutdown();
signalWebSocket.disconnect();
jobManager.removeListener(queueListener);
}

View File

@@ -22,7 +22,6 @@ import org.whispersystems.signalservice.api.payments.Money;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
final class ConfirmPaymentRepository {
@@ -54,7 +53,7 @@ final class ConfirmPaymentRepository {
recipientId = payee.requireRecipientId();
try {
mobileCoinPublicAddress = ProfileUtil.getAddressForRecipient(Recipient.resolved(recipientId));
} catch (InterruptedException | ExecutionException e) {
} catch (IOException e) {
Log.w(TAG, "Failed to get address for recipient " + recipientId);
consumer.accept(new ConfirmPaymentResult.Error());
return;

View File

@@ -16,7 +16,6 @@ import org.thoughtcrime.securesms.database.DatabaseFactory;
import org.thoughtcrime.securesms.database.RecipientDatabase;
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.keyvalue.SignalStore;
import org.thoughtcrime.securesms.messages.IncomingMessageObserver;
import org.thoughtcrime.securesms.payments.MobileCoinPublicAddress;
import org.thoughtcrime.securesms.payments.MobileCoinPublicAddressProfileUtil;
import org.thoughtcrime.securesms.payments.PaymentsAddressException;
@@ -27,10 +26,9 @@ import org.thoughtcrime.securesms.recipients.RecipientUtil;
import org.whispersystems.libsignal.IdentityKey;
import org.whispersystems.libsignal.IdentityKeyPair;
import org.whispersystems.libsignal.InvalidKeyException;
import org.whispersystems.libsignal.util.Pair;
import org.whispersystems.libsignal.util.guava.Optional;
import org.whispersystems.signalservice.api.SignalServiceAccountManager;
import org.whispersystems.signalservice.api.SignalServiceMessagePipe;
import org.whispersystems.signalservice.api.SignalServiceMessageReceiver;
import org.whispersystems.signalservice.api.crypto.InvalidCiphertextException;
import org.whispersystems.signalservice.api.crypto.ProfileCipher;
import org.whispersystems.signalservice.api.crypto.UnidentifiedAccess;
@@ -38,18 +36,14 @@ import org.whispersystems.signalservice.api.crypto.UnidentifiedAccessPair;
import org.whispersystems.signalservice.api.profiles.ProfileAndCredential;
import org.whispersystems.signalservice.api.profiles.SignalServiceProfile;
import org.whispersystems.signalservice.api.push.SignalServiceAddress;
import org.whispersystems.signalservice.api.push.exceptions.NotFoundException;
import org.whispersystems.signalservice.api.push.exceptions.PushNetworkException;
import org.whispersystems.signalservice.api.services.ProfileService;
import org.whispersystems.signalservice.api.util.StreamDetails;
import org.whispersystems.signalservice.internal.ServiceResponse;
import org.whispersystems.signalservice.internal.push.SignalServiceProtos;
import org.whispersystems.signalservice.internal.util.concurrent.CascadingFuture;
import org.whispersystems.signalservice.internal.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import io.reactivex.rxjava3.core.Single;
/**
* Aids in the retrieval and decryption of profiles.
@@ -67,40 +61,26 @@ public final class ProfileUtil {
@NonNull SignalServiceProfile.RequestType requestType)
throws IOException
{
try {
return retrieveProfile(context, recipient, requestType).get(10, TimeUnit.SECONDS);
} catch (ExecutionException e) {
if (e.getCause() instanceof PushNetworkException) {
throw (PushNetworkException) e.getCause();
} else if (e.getCause() instanceof NotFoundException) {
throw (NotFoundException) e.getCause();
} else {
throw new IOException(e);
}
} catch (InterruptedException | TimeoutException e) {
throw new PushNetworkException(e);
}
ProfileService profileService = new ProfileService(ApplicationDependencies.getGroupsV2Operations().getProfileOperations(),
ApplicationDependencies.getSignalServiceMessageReceiver(),
ApplicationDependencies.getSignalWebSocket());
Pair<Recipient, ServiceResponse<ProfileAndCredential>> response = retrieveProfile(context, recipient, requestType, profileService).blockingGet();
return new ProfileService.ProfileResponseProcessor(response.second()).getResultOrThrow();
}
public static @NonNull ListenableFuture<ProfileAndCredential> retrieveProfile(@NonNull Context context,
@NonNull Recipient recipient,
@NonNull SignalServiceProfile.RequestType requestType)
public static Single<Pair<Recipient, ServiceResponse<ProfileAndCredential>>> retrieveProfile(@NonNull Context context,
@NonNull Recipient recipient,
@NonNull SignalServiceProfile.RequestType requestType,
@NonNull ProfileService profileService)
{
SignalServiceAddress address = toSignalServiceAddress(context, recipient);
Optional<UnidentifiedAccess> unidentifiedAccess = getUnidentifiedAccess(context, recipient);
Optional<ProfileKey> profileKey = ProfileKeyUtil.profileKeyOptional(recipient.getProfileKey());
if (unidentifiedAccess.isPresent()) {
return new CascadingFuture<>(Arrays.asList(() -> getPipeRetrievalFuture(address, profileKey, unidentifiedAccess, requestType),
() -> getSocketRetrievalFuture(address, profileKey, unidentifiedAccess, requestType),
() -> getPipeRetrievalFuture(address, profileKey, Optional.absent(), requestType),
() -> getSocketRetrievalFuture(address, profileKey, Optional.absent(), requestType)),
e -> !(e instanceof NotFoundException));
} else {
return new CascadingFuture<>(Arrays.asList(() -> getPipeRetrievalFuture(address, profileKey, Optional.absent(), requestType),
() -> getSocketRetrievalFuture(address, profileKey, Optional.absent(), requestType)),
e -> !(e instanceof NotFoundException));
}
return profileService.getProfile(address, profileKey, unidentifiedAccess, requestType)
.map(p -> new Pair<>(recipient, p))
.onErrorReturn(t -> new Pair<>(recipient, ServiceResponse.forUnknownError(t)));
}
public static @Nullable String decryptString(@NonNull ProfileKey profileKey, @Nullable byte[] encryptedString)
@@ -126,7 +106,7 @@ public final class ProfileUtil {
@WorkerThread
public static @NonNull MobileCoinPublicAddress getAddressForRecipient(@NonNull Recipient recipient)
throws InterruptedException, ExecutionException, PaymentsAddressException
throws IOException, PaymentsAddressException
{
ProfileKey profileKey;
try {
@@ -135,7 +115,7 @@ public final class ProfileUtil {
Log.w(TAG, "Profile key not available for " + recipient.getId());
throw new PaymentsAddressException(PaymentsAddressException.Code.NO_PROFILE_KEY);
}
ProfileAndCredential profileAndCredential = ProfileUtil.retrieveProfile(ApplicationDependencies.getApplication(), recipient, SignalServiceProfile.RequestType.PROFILE).get();
ProfileAndCredential profileAndCredential = ProfileUtil.retrieveProfileSync(ApplicationDependencies.getApplication(), recipient, SignalServiceProfile.RequestType.PROFILE);
SignalServiceProfile profile = profileAndCredential.getProfile();
byte[] encryptedPaymentsAddress = profile.getPaymentAddress();
@@ -277,32 +257,6 @@ public final class ProfileUtil {
}
}
private static @NonNull ListenableFuture<ProfileAndCredential> getPipeRetrievalFuture(@NonNull SignalServiceAddress address,
@NonNull Optional<ProfileKey> profileKey,
@NonNull Optional<UnidentifiedAccess> unidentifiedAccess,
@NonNull SignalServiceProfile.RequestType requestType)
throws IOException
{
SignalServiceMessagePipe authPipe = IncomingMessageObserver.getPipe();
SignalServiceMessagePipe unidentifiedPipe = IncomingMessageObserver.getUnidentifiedPipe();
SignalServiceMessagePipe pipe = unidentifiedPipe != null && unidentifiedAccess.isPresent() ? unidentifiedPipe
: authPipe;
if (pipe != null) {
return pipe.getProfile(address, profileKey, unidentifiedAccess, requestType);
}
throw new IOException("No pipe available!");
}
private static @NonNull ListenableFuture<ProfileAndCredential> getSocketRetrievalFuture(@NonNull SignalServiceAddress address,
@NonNull Optional<ProfileKey> profileKey,
@NonNull Optional<UnidentifiedAccess> unidentifiedAccess,
@NonNull SignalServiceProfile.RequestType requestType)
{
SignalServiceMessageReceiver receiver = ApplicationDependencies.getSignalServiceMessageReceiver();
return receiver.retrieveProfile(address, profileKey, unidentifiedAccess, requestType);
}
private static Optional<UnidentifiedAccess> getUnidentifiedAccess(@NonNull Context context, @NonNull Recipient recipient) {
Optional<UnidentifiedAccessPair> unidentifiedAccess = UnidentifiedAccessUtil.getAccessFor(context, recipient, false);