diff --git a/app/build.gradle b/app/build.gradle index c12b738205..875f5aad7b 100644 --- a/app/build.gradle +++ b/app/build.gradle @@ -392,6 +392,7 @@ dependencies { implementation 'androidx.lifecycle:lifecycle-extensions:2.1.0' implementation 'androidx.lifecycle:lifecycle-viewmodel-savedstate:1.0.0-alpha05' implementation 'androidx.lifecycle:lifecycle-common-java8:2.1.0' + implementation 'androidx.lifecycle:lifecycle-reactivestreams-ktx:2.1.0' implementation "androidx.camera:camera-core:1.0.0-beta11" implementation "androidx.camera:camera-camera2:1.0.0-beta11" implementation "androidx.camera:camera-lifecycle:1.0.0-beta11" diff --git a/app/src/main/java/org/thoughtcrime/securesms/conversationlist/ConversationListFragment.java b/app/src/main/java/org/thoughtcrime/securesms/conversationlist/ConversationListFragment.java index 3d2ecc6c29..50f0ca93bb 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/conversationlist/ConversationListFragment.java +++ b/app/src/main/java/org/thoughtcrime/securesms/conversationlist/ConversationListFragment.java @@ -112,7 +112,6 @@ import org.thoughtcrime.securesms.megaphone.MegaphoneActionController; import org.thoughtcrime.securesms.megaphone.MegaphoneViewBuilder; import org.thoughtcrime.securesms.megaphone.Megaphones; import org.thoughtcrime.securesms.mms.GlideApp; -import org.thoughtcrime.securesms.net.PipeConnectivityListener; import org.thoughtcrime.securesms.notifications.MarkReadReceiver; import org.thoughtcrime.securesms.payments.preferences.PaymentsActivity; import org.thoughtcrime.securesms.payments.preferences.details.PaymentDetailsFragmentArgs; @@ -143,6 +142,7 @@ import org.thoughtcrime.securesms.util.concurrent.SimpleTask; import org.thoughtcrime.securesms.util.task.SnackbarAsyncTask; import org.thoughtcrime.securesms.util.views.Stub; import org.whispersystems.libsignal.util.guava.Optional; +import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState; import java.util.Collections; import java.util.HashSet; @@ -968,19 +968,21 @@ public class ConversationListFragment extends MainFragment implements ActionMode } } - private void updateProxyStatus(@NonNull PipeConnectivityListener.State state) { + private void updateProxyStatus(@NonNull WebSocketConnectionState state) { if (SignalStore.proxy().isProxyEnabled()) { proxyStatus.setVisibility(View.VISIBLE); switch (state) { case CONNECTING: + case DISCONNECTING: case DISCONNECTED: proxyStatus.setImageResource(R.drawable.ic_proxy_connecting_24); break; case CONNECTED: proxyStatus.setImageResource(R.drawable.ic_proxy_connected_24); break; - case FAILURE: + case AUTHENTICATION_FAILED: + case FAILED: proxyStatus.setImageResource(R.drawable.ic_proxy_failed_24); break; } diff --git a/app/src/main/java/org/thoughtcrime/securesms/conversationlist/ConversationListViewModel.java b/app/src/main/java/org/thoughtcrime/securesms/conversationlist/ConversationListViewModel.java index 39dc7a0652..d33cf8fe72 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/conversationlist/ConversationListViewModel.java +++ b/app/src/main/java/org/thoughtcrime/securesms/conversationlist/ConversationListViewModel.java @@ -5,6 +5,7 @@ import android.text.TextUtils; import androidx.annotation.NonNull; import androidx.lifecycle.LiveData; +import androidx.lifecycle.LiveDataReactiveStreams; import androidx.lifecycle.MutableLiveData; import androidx.lifecycle.ViewModel; import androidx.lifecycle.ViewModelProvider; @@ -14,7 +15,6 @@ import org.signal.paging.PagedData; import org.signal.paging.PagingConfig; import org.signal.paging.PagingController; import org.thoughtcrime.securesms.conversationlist.model.Conversation; -import org.thoughtcrime.securesms.search.SearchResult; import org.thoughtcrime.securesms.conversationlist.model.UnreadPayments; import org.thoughtcrime.securesms.conversationlist.model.UnreadPaymentsLiveData; import org.thoughtcrime.securesms.database.DatabaseFactory; @@ -23,17 +23,20 @@ import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; import org.thoughtcrime.securesms.megaphone.Megaphone; import org.thoughtcrime.securesms.megaphone.MegaphoneRepository; import org.thoughtcrime.securesms.megaphone.Megaphones; -import org.thoughtcrime.securesms.net.PipeConnectivityListener; import org.thoughtcrime.securesms.payments.UnreadPaymentsRepository; import org.thoughtcrime.securesms.search.SearchRepository; +import org.thoughtcrime.securesms.search.SearchResult; import org.thoughtcrime.securesms.util.Debouncer; import org.thoughtcrime.securesms.util.ThrottledDebouncer; import org.thoughtcrime.securesms.util.livedata.LiveDataUtil; import org.thoughtcrime.securesms.util.paging.Invalidator; import org.whispersystems.libsignal.util.guava.Optional; +import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState; import java.util.List; +import io.reactivex.rxjava3.core.BackpressureStrategy; + class ConversationListViewModel extends ViewModel { private static final String TAG = Log.tag(ConversationListViewModel.class); @@ -117,8 +120,8 @@ class ConversationListViewModel extends ViewModel { return pagedData.getController(); } - @NonNull LiveData getPipeState() { - return ApplicationDependencies.getPipeListener().getState(); + @NonNull LiveData getPipeState() { + return LiveDataReactiveStreams.fromPublisher(ApplicationDependencies.getSignalWebSocket().getWebSocketState().toFlowable(BackpressureStrategy.LATEST)); } @NonNull LiveData> getUnreadPaymentsLiveData() { diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencies.java b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencies.java index 8b7477e468..865961e39b 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencies.java +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencies.java @@ -19,7 +19,6 @@ import org.thoughtcrime.securesms.megaphone.MegaphoneRepository; import org.thoughtcrime.securesms.messages.BackgroundMessageRetriever; import org.thoughtcrime.securesms.messages.IncomingMessageObserver; import org.thoughtcrime.securesms.messages.IncomingMessageProcessor; -import org.thoughtcrime.securesms.net.PipeConnectivityListener; import org.thoughtcrime.securesms.net.StandardUserAgentInterceptor; import org.thoughtcrime.securesms.notifications.MessageNotifier; import org.thoughtcrime.securesms.payments.Payments; @@ -112,10 +111,6 @@ public class ApplicationDependencies { return application; } - public static @NonNull PipeConnectivityListener getPipeListener() { - return provider.providePipeListener(); - } - public static @NonNull SignalServiceAccountManager getSignalServiceAccountManager() { SignalServiceAccountManager local = accountManager; @@ -227,7 +222,6 @@ public class ApplicationDependencies { public static void resetNetworkConnectionsAfterProxyChange() { synchronized (LOCK) { - getPipeListener().reset(); closeConnections(); } } @@ -509,7 +503,6 @@ public class ApplicationDependencies { } public interface Provider { - @NonNull PipeConnectivityListener providePipeListener(); @NonNull GroupsV2Operations provideGroupsV2Operations(); @NonNull SignalServiceAccountManager provideSignalServiceAccountManager(); @NonNull SignalServiceMessageSender provideSignalServiceMessageSender(@NonNull SignalWebSocket signalWebSocket); diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java index 93f3893daf..e98c02131b 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java @@ -6,7 +6,6 @@ import android.content.Context; import androidx.annotation.NonNull; import org.signal.core.util.concurrent.SignalExecutors; -import org.signal.core.util.logging.Log; import org.thoughtcrime.securesms.BuildConfig; import org.thoughtcrime.securesms.components.TypingStatusRepository; import org.thoughtcrime.securesms.components.TypingStatusSender; @@ -14,6 +13,7 @@ import org.thoughtcrime.securesms.crypto.ReentrantSessionLock; import org.thoughtcrime.securesms.crypto.storage.SignalProtocolStoreImpl; import org.thoughtcrime.securesms.database.DatabaseObserver; import org.thoughtcrime.securesms.database.JobDatabase; +import org.thoughtcrime.securesms.database.PendingRetryReceiptCache; import org.thoughtcrime.securesms.jobmanager.JobManager; import org.thoughtcrime.securesms.jobmanager.JobMigrator; import org.thoughtcrime.securesms.jobmanager.impl.FactoryJobPredicate; @@ -33,8 +33,7 @@ import org.thoughtcrime.securesms.megaphone.MegaphoneRepository; import org.thoughtcrime.securesms.messages.BackgroundMessageRetriever; import org.thoughtcrime.securesms.messages.IncomingMessageObserver; import org.thoughtcrime.securesms.messages.IncomingMessageProcessor; -import org.thoughtcrime.securesms.database.PendingRetryReceiptCache; -import org.thoughtcrime.securesms.net.PipeConnectivityListener; +import org.thoughtcrime.securesms.net.SignalWebSocketHealthMonitor; import org.thoughtcrime.securesms.notifications.MessageNotifier; import org.thoughtcrime.securesms.notifications.OptimizedMessageNotifier; import org.thoughtcrime.securesms.payments.MobileCoinConfig; @@ -75,25 +74,16 @@ import java.util.UUID; */ public class ApplicationDependencyProvider implements ApplicationDependencies.Provider { - private static final String TAG = Log.tag(ApplicationDependencyProvider.class); - - private final Application context; - private final PipeConnectivityListener pipeListener; + private final Application context; public ApplicationDependencyProvider(@NonNull Application context) { - this.context = context; - this.pipeListener = new PipeConnectivityListener(context); + this.context = context; } private @NonNull ClientZkOperations provideClientZkOperations() { return ClientZkOperations.create(provideSignalServiceNetworkAccess().getConfiguration(context)); } - @Override - public @NonNull PipeConnectivityListener providePipeListener() { - return pipeListener; - } - @Override public @NonNull GroupsV2Operations provideGroupsV2Operations() { return new GroupsV2Operations(provideClientZkOperations()); @@ -126,13 +116,9 @@ public class ApplicationDependencyProvider implements ApplicationDependencies.Pr @Override public @NonNull SignalServiceMessageReceiver provideSignalServiceMessageReceiver() { - SleepTimer sleepTimer = TextSecurePreferences.isFcmDisabled(context) ? new AlarmSleepTimer(context) - : new UptimeSleepTimer(); return new SignalServiceMessageReceiver(provideSignalServiceNetworkAccess().getConfiguration(context), new DynamicCredentialsProvider(context), BuildConfig.SIGNAL_AGENT, - pipeListener, - sleepTimer, provideClientZkOperations().getProfileOperations(), FeatureFlags.okHttpAutomaticRetry()); } @@ -265,35 +251,33 @@ public class ApplicationDependencyProvider implements ApplicationDependencies.Pr @Override public @NonNull SignalWebSocket provideSignalWebSocket() { - return new SignalWebSocket(provideWebSocketFactory()); + SleepTimer sleepTimer = TextSecurePreferences.isFcmDisabled(context) ? new AlarmSleepTimer(context) : new UptimeSleepTimer(); + SignalWebSocketHealthMonitor healthMonitor = new SignalWebSocketHealthMonitor(context, sleepTimer); + SignalWebSocket signalWebSocket = new SignalWebSocket(provideWebSocketFactory(healthMonitor)); + + healthMonitor.monitor(signalWebSocket); + + return signalWebSocket; } - private @NonNull WebSocketFactory provideWebSocketFactory() { + private @NonNull WebSocketFactory provideWebSocketFactory(@NonNull SignalWebSocketHealthMonitor healthMonitor) { return new WebSocketFactory() { @Override public WebSocketConnection createWebSocket() { - SleepTimer sleepTimer = TextSecurePreferences.isFcmDisabled(context) ? new AlarmSleepTimer(context) - : new UptimeSleepTimer(); - return new WebSocketConnection("normal", provideSignalServiceNetworkAccess().getConfiguration(context), Optional.of(new DynamicCredentialsProvider(context)), BuildConfig.SIGNAL_AGENT, - pipeListener, - sleepTimer); + healthMonitor); } @Override public WebSocketConnection createUnidentifiedWebSocket() { - SleepTimer sleepTimer = TextSecurePreferences.isFcmDisabled(context) ? new AlarmSleepTimer(context) - : new UptimeSleepTimer(); - return new WebSocketConnection("unidentified", provideSignalServiceNetworkAccess().getConfiguration(context), Optional.absent(), BuildConfig.SIGNAL_AGENT, - pipeListener, - sleepTimer); + healthMonitor); } }; } diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.java b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.java index acf65d4f88..db2ad01057 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.java +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.java @@ -14,10 +14,12 @@ import androidx.annotation.Nullable; import androidx.core.app.NotificationCompat; import androidx.core.content.ContextCompat; +import org.signal.core.util.ThreadUtil; import org.signal.core.util.concurrent.SignalExecutors; import org.signal.core.util.logging.Log; import org.thoughtcrime.securesms.R; import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; +import org.thoughtcrime.securesms.jobmanager.impl.BackoffUtil; import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint; import org.thoughtcrime.securesms.jobs.PushDecryptDrainedJob; import org.thoughtcrime.securesms.keyvalue.SignalStore; @@ -50,6 +52,7 @@ public class IncomingMessageObserver { private final Application context; private final SignalServiceNetworkAccess networkAccess; private final List decryptionDrainedListeners; + private final BroadcastReceiver connectionReceiver; private boolean appVisible; @@ -84,7 +87,7 @@ public class IncomingMessageObserver { } }); - context.registerReceiver(new BroadcastReceiver() { + connectionReceiver = new BroadcastReceiver() { @Override public void onReceive(Context context, Intent intent) { synchronized (IncomingMessageObserver.this) { @@ -92,12 +95,14 @@ public class IncomingMessageObserver { Log.w(TAG, "Lost network connection. Shutting down our websocket connections and resetting the drained state."); networkDrained = false; decryptionDrained = false; - shutdown(); + disconnect(); } IncomingMessageObserver.this.notifyAll(); } } - }, new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION)); + }; + + context.registerReceiver(connectionReceiver, new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION)); } public synchronized void notifyRegistrationChanged() { @@ -169,14 +174,16 @@ public class IncomingMessageObserver { public void terminateAsync() { INSTANCE_COUNT.decrementAndGet(); + context.unregisterReceiver(connectionReceiver); + SignalExecutors.BOUNDED.execute(() -> { Log.w(TAG, "Beginning termination."); terminated = true; - shutdown(); + disconnect(); }); } - private void shutdown() { + private void disconnect() { ApplicationDependencies.getSignalWebSocket().disconnect(); } @@ -190,8 +197,15 @@ public class IncomingMessageObserver { @Override public void run() { + int attempts = 0; + while (!terminated) { Log.i(TAG, "Waiting for websocket state change...."); + if (attempts > 1) { + long backoff = BackoffUtil.exponentialBackoff(attempts, TimeUnit.SECONDS.toMillis(30)); + Log.w(TAG, "Too many failed connection attempts, attempts: " + attempts + " backing off: " + backoff); + ThreadUtil.sleep(backoff); + } waitForConnectionNecessary(); Log.i(TAG, "Making websocket connection...."); @@ -208,6 +222,7 @@ public class IncomingMessageObserver { processor.processEnvelope(envelope); } }); + attempts = 0; if (!result.isPresent() && !networkDrained) { Log.i(TAG, "Network was newly-drained. Enqueuing a job to listen for decryption draining."); @@ -219,13 +234,15 @@ public class IncomingMessageObserver { signalWebSocket.connect(); } catch (TimeoutException e) { Log.w(TAG, "Application level read timeout..."); + attempts = 0; } } } catch (Throwable e) { + attempts++; Log.w(TAG, e); } finally { Log.w(TAG, "Shutting down pipe..."); - shutdown(); + disconnect(); } Log.i(TAG, "Looping..."); diff --git a/app/src/main/java/org/thoughtcrime/securesms/net/HttpErrorTracker.java b/app/src/main/java/org/thoughtcrime/securesms/net/HttpErrorTracker.java new file mode 100644 index 0000000000..67ea055665 --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/net/HttpErrorTracker.java @@ -0,0 +1,44 @@ +package org.thoughtcrime.securesms.net; + +import java.util.Arrays; + +/** + * Track error occurrences by time and indicate if too many occur within the + * time limit. + */ +public final class HttpErrorTracker { + + private final long[] timestamps; + private final long errorTimeRange; + + public HttpErrorTracker(int samples, long errorTimeRange) { + this.timestamps = new long[samples]; + this.errorTimeRange = errorTimeRange; + } + + public synchronized boolean addSample(long now) { + long errorsMustBeAfter = now - errorTimeRange; + int count = 1; + int minIndex = 0; + + for (int i = 0; i < timestamps.length; i++) { + if (timestamps[i] < errorsMustBeAfter) { + timestamps[i] = 0; + } else if (timestamps[i] != 0) { + count++; + } + + if (timestamps[i] < timestamps[minIndex]) { + minIndex = i; + } + } + + timestamps[minIndex] = now; + + if (count >= timestamps.length) { + Arrays.fill(timestamps, 0); + return true; + } + return false; + } +} diff --git a/app/src/main/java/org/thoughtcrime/securesms/net/PipeConnectivityListener.java b/app/src/main/java/org/thoughtcrime/securesms/net/PipeConnectivityListener.java deleted file mode 100644 index dc837f3000..0000000000 --- a/app/src/main/java/org/thoughtcrime/securesms/net/PipeConnectivityListener.java +++ /dev/null @@ -1,89 +0,0 @@ -package org.thoughtcrime.securesms.net; - -import android.app.Application; - -import androidx.annotation.NonNull; - -import org.greenrobot.eventbus.EventBus; -import org.signal.core.util.logging.Log; -import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; -import org.thoughtcrime.securesms.events.ReminderUpdateEvent; -import org.thoughtcrime.securesms.keyvalue.SignalStore; -import org.thoughtcrime.securesms.util.DefaultValueLiveData; -import org.thoughtcrime.securesms.util.TextSecurePreferences; -import org.whispersystems.signalservice.api.websocket.ConnectivityListener; - -import okhttp3.Response; - -/** - * Our standard listener for reacting to the state of the websocket. Translates the state into a - * LiveData for observation. - */ -public class PipeConnectivityListener implements ConnectivityListener { - - private static final String TAG = Log.tag(PipeConnectivityListener.class); - - private final Application application; - private final DefaultValueLiveData state; - - public PipeConnectivityListener(@NonNull Application application) { - this.application = application; - this.state = new DefaultValueLiveData<>(State.DISCONNECTED); - } - - @Override - public void onConnected() { - Log.i(TAG, "onConnected()"); - TextSecurePreferences.setUnauthorizedReceived(application, false); - state.postValue(State.CONNECTED); - } - - @Override - public void onConnecting() { - Log.i(TAG, "onConnecting()"); - state.postValue(State.CONNECTING); - } - - @Override - public void onDisconnected() { - Log.w(TAG, "onDisconnected()"); - - if (state.getValue() != State.FAILURE) { - state.postValue(State.DISCONNECTED); - } - } - - @Override - public void onAuthenticationFailure() { - Log.w(TAG, "onAuthenticationFailure()"); - TextSecurePreferences.setUnauthorizedReceived(application, true); - EventBus.getDefault().post(new ReminderUpdateEvent()); - state.postValue(State.FAILURE); - } - - @Override - public boolean onGenericFailure(Response response, Throwable throwable) { - Log.w(TAG, "onGenericFailure() Response: " + response, throwable); - state.postValue(State.FAILURE); - - if (SignalStore.proxy().isProxyEnabled()) { - Log.w(TAG, "Encountered an error while we had a proxy set! Terminating the connection to prevent retry spam."); - ApplicationDependencies.closeConnections(); - return false; - } else { - return true; - } - } - - public void reset() { - state.postValue(State.DISCONNECTED); - } - - public @NonNull DefaultValueLiveData getState() { - return state; - } - - public enum State { - DISCONNECTED, CONNECTING, CONNECTED, FAILURE - } -} diff --git a/app/src/main/java/org/thoughtcrime/securesms/net/SignalWebSocketHealthMonitor.java b/app/src/main/java/org/thoughtcrime/securesms/net/SignalWebSocketHealthMonitor.java new file mode 100644 index 0000000000..9fc7012e6b --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/net/SignalWebSocketHealthMonitor.java @@ -0,0 +1,171 @@ +package org.thoughtcrime.securesms.net; + +import android.app.Application; + +import androidx.annotation.NonNull; + +import org.greenrobot.eventbus.EventBus; +import org.signal.core.util.logging.Log; +import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; +import org.thoughtcrime.securesms.events.ReminderUpdateEvent; +import org.thoughtcrime.securesms.keyvalue.SignalStore; +import org.thoughtcrime.securesms.util.TextSecurePreferences; +import org.whispersystems.libsignal.util.guava.Preconditions; +import org.whispersystems.signalservice.api.SignalWebSocket; +import org.whispersystems.signalservice.api.util.SleepTimer; +import org.whispersystems.signalservice.api.websocket.HealthMonitor; +import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState; +import org.whispersystems.signalservice.internal.websocket.WebSocketConnection; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import io.reactivex.rxjava3.schedulers.Schedulers; + +/** + * Monitors the health of the identified and unidentified WebSockets. If either one appears to be + * unhealthy, will trigger restarting both. + *

+ * The monitor is also responsible for sending heartbeats/keep-alive messages to prevent + * timeouts. + */ +public final class SignalWebSocketHealthMonitor implements HealthMonitor { + + private static final String TAG = Log.tag(SignalWebSocketHealthMonitor.class); + + private static final long KEEP_ALIVE_SEND_CADENCE = TimeUnit.SECONDS.toMillis(WebSocketConnection.KEEPALIVE_TIMEOUT_SECONDS); + private static final long MAX_TIME_SINCE_SUCCESSFUL_KEEP_ALIVE = KEEP_ALIVE_SEND_CADENCE * 3; + + private final Application context; + private SignalWebSocket signalWebSocket; + private final SleepTimer sleepTimer; + + private volatile KeepAliveSender keepAliveSender; + + private final HealthState identified = new HealthState(); + private final HealthState unidentified = new HealthState(); + + public SignalWebSocketHealthMonitor(@NonNull Application context, @NonNull SleepTimer sleepTimer) { + this.context = context; + this.sleepTimer = sleepTimer; + } + + public void monitor(@NonNull SignalWebSocket signalWebSocket) { + Preconditions.checkNotNull(signalWebSocket); + Preconditions.checkArgument(this.signalWebSocket == null, "monitor can only be called once"); + + this.signalWebSocket = signalWebSocket; + + //noinspection ResultOfMethodCallIgnored + signalWebSocket.getWebSocketState() + .subscribeOn(Schedulers.computation()) + .observeOn(Schedulers.computation()) + .distinctUntilChanged() + .subscribe(s -> onStateChange(s, identified)); + + //noinspection ResultOfMethodCallIgnored + signalWebSocket.getUnidentifiedWebSocketState() + .subscribeOn(Schedulers.computation()) + .observeOn(Schedulers.computation()) + .distinctUntilChanged() + .subscribe(s -> onStateChange(s, unidentified)); + } + + private synchronized void onStateChange(WebSocketConnectionState connectionState, HealthState healthState) { + switch (connectionState) { + case CONNECTED: + TextSecurePreferences.setUnauthorizedReceived(context, false); + break; + case AUTHENTICATION_FAILED: + TextSecurePreferences.setUnauthorizedReceived(context, true); + EventBus.getDefault().post(new ReminderUpdateEvent()); + break; + case FAILED: + if (SignalStore.proxy().isProxyEnabled()) { + Log.w(TAG, "Encountered an error while we had a proxy set! Terminating the connection to prevent retry spam."); + ApplicationDependencies.closeConnections(); + } + break; + } + + healthState.needsKeepAlive = connectionState == WebSocketConnectionState.CONNECTED; + + if (keepAliveSender == null && isKeepAliveNecessary()) { + keepAliveSender = new KeepAliveSender(); + keepAliveSender.start(); + } else if (keepAliveSender != null && !isKeepAliveNecessary()) { + keepAliveSender.shutdown(); + keepAliveSender = null; + } + } + + @Override + public void onKeepAliveResponse(long sentTimestamp, boolean isIdentifiedWebSocket) { + if (isIdentifiedWebSocket) { + identified.lastKeepAliveReceived = System.currentTimeMillis(); + } else { + unidentified.lastKeepAliveReceived = System.currentTimeMillis(); + } + } + + @Override + public void onMessageError(int status, boolean isIdentifiedWebSocket) { + if (status == 409) { + HealthState healthState = (isIdentifiedWebSocket ? identified : unidentified); + if (healthState.mismatchErrorTracker.addSample(System.currentTimeMillis())) { + Log.w(TAG, "Received too many mismatch device errors, forcing new websockets."); + signalWebSocket.forceNewWebSockets(); + } + } + } + + private boolean isKeepAliveNecessary() { + return identified.needsKeepAlive || unidentified.needsKeepAlive; + } + + private static class HealthState { + private final HttpErrorTracker mismatchErrorTracker = new HttpErrorTracker(5, TimeUnit.MINUTES.toMillis(1)); + + private volatile boolean needsKeepAlive; + private volatile long lastKeepAliveReceived; + } + + /** + * Sends periodic heartbeats/keep-alives over both WebSockets to prevent connection timeouts. If + * either WebSocket fails 3 times to get a return heartbeat both are forced to be recreated. + */ + private class KeepAliveSender extends Thread { + + private volatile boolean shouldKeepRunning = true; + + public void run() { + identified.lastKeepAliveReceived = System.currentTimeMillis(); + unidentified.lastKeepAliveReceived = System.currentTimeMillis(); + + while (shouldKeepRunning && isKeepAliveNecessary()) { + try { + sleepTimer.sleep(KEEP_ALIVE_SEND_CADENCE); + + if (shouldKeepRunning && isKeepAliveNecessary()) { + long keepAliveRequiredSinceTime = System.currentTimeMillis() - MAX_TIME_SINCE_SUCCESSFUL_KEEP_ALIVE; + + if (identified.lastKeepAliveReceived < keepAliveRequiredSinceTime || unidentified.lastKeepAliveReceived < keepAliveRequiredSinceTime) { + Log.w(TAG, "Missed keep alives, identified last: " + identified.lastKeepAliveReceived + + " unidentified last: " + unidentified.lastKeepAliveReceived + + " needed by: " + keepAliveRequiredSinceTime); + signalWebSocket.forceNewWebSockets(); + } else { + signalWebSocket.sendKeepAlive(); + } + } + } catch (Throwable e) { + Log.w(TAG, e); + } + } + } + + public void shutdown() { + shouldKeepRunning = false; + } + } +} diff --git a/app/src/main/java/org/thoughtcrime/securesms/preferences/EditProxyFragment.java b/app/src/main/java/org/thoughtcrime/securesms/preferences/EditProxyFragment.java index 8d11b261a3..355b8d4d62 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/preferences/EditProxyFragment.java +++ b/app/src/main/java/org/thoughtcrime/securesms/preferences/EditProxyFragment.java @@ -21,13 +21,13 @@ import com.dd.CircularProgressButton; import org.thoughtcrime.securesms.R; import org.thoughtcrime.securesms.contactshare.SimpleTextWatcher; import org.thoughtcrime.securesms.keyvalue.SignalStore; -import org.thoughtcrime.securesms.net.PipeConnectivityListener; import org.thoughtcrime.securesms.util.CommunicationActions; import org.thoughtcrime.securesms.util.SignalProxyUtil; import org.thoughtcrime.securesms.util.Util; import org.thoughtcrime.securesms.util.ViewUtil; import org.thoughtcrime.securesms.util.views.LearnMoreTextView; import org.whispersystems.libsignal.util.guava.Optional; +import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState; import org.whispersystems.signalservice.internal.configuration.SignalProxy; public class EditProxyFragment extends Fragment { @@ -118,10 +118,11 @@ public class EditProxyFragment extends Fragment { } } - private void presentProxyState(@NonNull PipeConnectivityListener.State proxyState) { + private void presentProxyState(@NonNull WebSocketConnectionState proxyState) { if (SignalStore.proxy().getProxy() != null) { switch (proxyState) { case DISCONNECTED: + case DISCONNECTING: case CONNECTING: proxyStatus.setText(R.string.preferences_connecting_to_proxy); proxyStatus.setTextColor(getResources().getColor(R.color.signal_text_secondary)); @@ -130,7 +131,8 @@ public class EditProxyFragment extends Fragment { proxyStatus.setText(R.string.preferences_connected_to_proxy); proxyStatus.setTextColor(getResources().getColor(R.color.signal_accent_green)); break; - case FAILURE: + case AUTHENTICATION_FAILED: + case FAILED: proxyStatus.setText(R.string.preferences_connection_failed); proxyStatus.setTextColor(getResources().getColor(R.color.signal_alert_primary)); break; diff --git a/app/src/main/java/org/thoughtcrime/securesms/preferences/EditProxyViewModel.java b/app/src/main/java/org/thoughtcrime/securesms/preferences/EditProxyViewModel.java index 98f5c1a3d0..7823ce5b50 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/preferences/EditProxyViewModel.java +++ b/app/src/main/java/org/thoughtcrime/securesms/preferences/EditProxyViewModel.java @@ -8,28 +8,34 @@ import androidx.lifecycle.ViewModel; import org.signal.core.util.concurrent.SignalExecutors; import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; import org.thoughtcrime.securesms.keyvalue.SignalStore; -import org.thoughtcrime.securesms.net.PipeConnectivityListener; import org.thoughtcrime.securesms.util.SignalProxyUtil; import org.thoughtcrime.securesms.util.SingleLiveEvent; import org.thoughtcrime.securesms.util.TextSecurePreferences; import org.thoughtcrime.securesms.util.Util; +import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState; import org.whispersystems.signalservice.internal.configuration.SignalProxy; import java.util.concurrent.TimeUnit; +import io.reactivex.rxjava3.core.BackpressureStrategy; + +import static androidx.lifecycle.LiveDataReactiveStreams.fromPublisher; + public class EditProxyViewModel extends ViewModel { - private final SingleLiveEvent events; - private final MutableLiveData uiState; - private final MutableLiveData saveState; - private final LiveData pipeState; + private final SingleLiveEvent events; + private final MutableLiveData uiState; + private final MutableLiveData saveState; + private final LiveData pipeState; public EditProxyViewModel() { this.events = new SingleLiveEvent<>(); this.uiState = new MutableLiveData<>(); this.saveState = new MutableLiveData<>(SaveState.IDLE); this.pipeState = TextSecurePreferences.getLocalNumber(ApplicationDependencies.getApplication()) == null ? new MutableLiveData<>() - : ApplicationDependencies.getPipeListener().getState(); + : fromPublisher(ApplicationDependencies.getSignalWebSocket() + .getWebSocketState() + .toFlowable(BackpressureStrategy.LATEST)); if (SignalStore.proxy().isProxyEnabled()) { uiState.setValue(UiState.ALL_ENABLED); @@ -81,7 +87,7 @@ public class EditProxyViewModel extends ViewModel { return events; } - @NonNull LiveData getProxyState() { + @NonNull LiveData getProxyState() { return pipeState; } diff --git a/app/src/main/java/org/thoughtcrime/securesms/util/SignalProxyUtil.java b/app/src/main/java/org/thoughtcrime/securesms/util/SignalProxyUtil.java index a39f1dae30..a320e9f761 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/util/SignalProxyUtil.java +++ b/app/src/main/java/org/thoughtcrime/securesms/util/SignalProxyUtil.java @@ -3,17 +3,15 @@ package org.thoughtcrime.securesms.util; import androidx.annotation.NonNull; import androidx.annotation.Nullable; import androidx.annotation.WorkerThread; -import androidx.lifecycle.Observer; import org.conscrypt.Conscrypt; -import org.signal.core.util.ThreadUtil; import org.signal.core.util.concurrent.SignalExecutors; import org.signal.core.util.logging.Log; import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; import org.thoughtcrime.securesms.keyvalue.SignalStore; -import org.thoughtcrime.securesms.net.PipeConnectivityListener; import org.thoughtcrime.securesms.push.AccountManagerFactory; import org.whispersystems.signalservice.api.SignalServiceAccountManager; +import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState; import org.whispersystems.signalservice.internal.configuration.SignalProxy; import java.io.IOException; @@ -23,6 +21,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Matcher; import java.util.regex.Pattern; +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.schedulers.Schedulers; + public final class SignalProxyUtil { private static final String TAG = Log.tag(SignalProxyUtil.class); @@ -35,7 +36,7 @@ public final class SignalProxyUtil { private SignalProxyUtil() {} public static void startListeningToWebsocket() { - if (SignalStore.proxy().isProxyEnabled() && ApplicationDependencies.getPipeListener().getState().getValue() == PipeConnectivityListener.State.FAILURE) { + if (SignalStore.proxy().isProxyEnabled() && ApplicationDependencies.getSignalWebSocket().getWebSocketState().firstOrError().blockingGet().isFailure()) { Log.w(TAG, "Proxy is in a failed state. Restarting."); ApplicationDependencies.closeConnections(); } @@ -81,30 +82,16 @@ public final class SignalProxyUtil { return testWebsocketConnectionUnregistered(timeout); } - CountDownLatch latch = new CountDownLatch(1); - AtomicBoolean success = new AtomicBoolean(false); - - Observer observer = state -> { - if (state == PipeConnectivityListener.State.CONNECTED) { - success.set(true); - latch.countDown(); - } else if (state == PipeConnectivityListener.State.FAILURE) { - success.set(false); - latch.countDown(); - } - }; - - ThreadUtil.runOnMainSync(() -> ApplicationDependencies.getPipeListener().getState().observeForever(observer)); - - try { - latch.await(timeout, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - Log.w(TAG, "Interrupted!", e); - } finally { - ThreadUtil.runOnMainSync(() -> ApplicationDependencies.getPipeListener().getState().removeObserver(observer)); - } - - return success.get(); + return ApplicationDependencies.getSignalWebSocket() + .getWebSocketState() + .subscribeOn(Schedulers.trampoline()) + .observeOn(Schedulers.trampoline()) + .timeout(timeout, TimeUnit.MILLISECONDS) + .skipWhile(state -> state != WebSocketConnectionState.CONNECTED && !state.isFailure()) + .firstOrError() + .flatMap(state -> Single.just(state == WebSocketConnectionState.CONNECTED)) + .onErrorReturn(t -> false) + .blockingGet(); } /** diff --git a/app/src/test/java/org/thoughtcrime/securesms/net/HttpErrorTrackerTest.java b/app/src/test/java/org/thoughtcrime/securesms/net/HttpErrorTrackerTest.java new file mode 100644 index 0000000000..ae5383e3e9 --- /dev/null +++ b/app/src/test/java/org/thoughtcrime/securesms/net/HttpErrorTrackerTest.java @@ -0,0 +1,29 @@ +package org.thoughtcrime.securesms.net; + +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class HttpErrorTrackerTest { + + private static final long START_TIME = TimeUnit.MINUTES.toMillis(60); + private static final long SHORT_TIME = TimeUnit.SECONDS.toMillis(1); + private static final long LONG_TIME = TimeUnit.SECONDS.toMillis(30); + private static final long REALLY_LONG_TIME = TimeUnit.SECONDS.toMillis(90); + + @Test + public void addSample() { + HttpErrorTracker tracker = new HttpErrorTracker(2, TimeUnit.MINUTES.toMillis(1)); + // First sample + assertFalse(tracker.addSample(START_TIME)); + // Second sample within 1 minute + assertTrue(tracker.addSample(START_TIME + SHORT_TIME)); + // Reset, new first sample + assertFalse(tracker.addSample(START_TIME + SHORT_TIME + LONG_TIME)); + // Second sample more than 1 minute after + assertFalse(tracker.addSample(START_TIME + SHORT_TIME + LONG_TIME + REALLY_LONG_TIME)); + } +} \ No newline at end of file diff --git a/app/witness-verifications.gradle b/app/witness-verifications.gradle index 70c3c1d7cd..96d25af27b 100644 --- a/app/witness-verifications.gradle +++ b/app/witness-verifications.gradle @@ -141,6 +141,12 @@ dependencyVerification { ['androidx.lifecycle:lifecycle-process:2.1.0', '8cddd0c7f4927bbf71fb71fca000786df82cc597c99463d6916ccbe4a205a9ac'], + ['androidx.lifecycle:lifecycle-reactivestreams-ktx:2.1.0', + '8729895193f5fa36814bd399c866a81a87270b802979869b1082127281bd2836'], + + ['androidx.lifecycle:lifecycle-reactivestreams:2.1.0', + '4b974fa3e691a6d205fccecfa930aa7880aabf9d80159de91bbaed1d351bc4ab'], + ['androidx.lifecycle:lifecycle-runtime-ktx:2.3.1', '7ad2987dd7f4075c0871a72cf07e9649d9cd790fc23dfab1972eca4710373873'], diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageReceiver.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageReceiver.java index 5cc9b06509..05cf78235b 100644 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageReceiver.java +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageReceiver.java @@ -23,9 +23,7 @@ import org.whispersystems.signalservice.api.profiles.SignalServiceProfile; import org.whispersystems.signalservice.api.push.SignalServiceAddress; import org.whispersystems.signalservice.api.push.exceptions.MissingConfigurationException; import org.whispersystems.signalservice.api.util.CredentialsProvider; -import org.whispersystems.signalservice.api.util.SleepTimer; import org.whispersystems.signalservice.api.util.UuidUtil; -import org.whispersystems.signalservice.api.websocket.ConnectivityListener; import org.whispersystems.signalservice.internal.configuration.SignalServiceConfiguration; import org.whispersystems.signalservice.internal.push.PushServiceSocket; import org.whispersystems.signalservice.internal.push.SignalServiceEnvelopeEntity; @@ -50,16 +48,9 @@ import java.util.UUID; * * @author Moxie Marlinspike */ -@SuppressWarnings("OptionalUsedAsFieldOrParameterType") public class SignalServiceMessageReceiver { - private final PushServiceSocket socket; - private final SignalServiceConfiguration urls; - private final CredentialsProvider credentialsProvider; - private final String signalAgent; - private final ConnectivityListener connectivityListener; - private final SleepTimer sleepTimer; - private final ClientZkProfileOperations clientZkProfileOperations; + private final PushServiceSocket socket; /** * Construct a SignalServiceMessageReceiver. @@ -70,18 +61,10 @@ public class SignalServiceMessageReceiver { public SignalServiceMessageReceiver(SignalServiceConfiguration urls, CredentialsProvider credentials, String signalAgent, - ConnectivityListener listener, - SleepTimer timer, ClientZkProfileOperations clientZkProfileOperations, boolean automaticNetworkRetry) { - this.urls = urls; - this.credentialsProvider = credentials; - this.socket = new PushServiceSocket(urls, credentials, signalAgent, clientZkProfileOperations, automaticNetworkRetry); - this.signalAgent = signalAgent; - this.connectivityListener = listener; - this.sleepTimer = timer; - this.clientZkProfileOperations = clientZkProfileOperations; + this.socket = new PushServiceSocket(urls, credentials, signalAgent, clientZkProfileOperations, automaticNetworkRetry); } /** diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalWebSocket.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalWebSocket.java index b00a5704ec..d9874ca85e 100644 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalWebSocket.java +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/SignalWebSocket.java @@ -4,6 +4,7 @@ import org.whispersystems.libsignal.logging.Log; import org.whispersystems.libsignal.util.guava.Optional; import org.whispersystems.signalservice.api.crypto.UnidentifiedAccess; import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope; +import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState; import org.whispersystems.signalservice.api.websocket.WebSocketFactory; import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException; import org.whispersystems.signalservice.internal.websocket.WebSocketConnection; @@ -15,7 +16,12 @@ import org.whispersystems.util.Base64; import java.io.IOException; import java.util.concurrent.TimeoutException; +import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.disposables.CompositeDisposable; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.schedulers.Schedulers; +import io.reactivex.rxjava3.subjects.BehaviorSubject; /** * Provide a general interface to the WebSocket for making requests and reading messages sent by the server. @@ -29,14 +35,43 @@ public final class SignalWebSocket { private final WebSocketFactory webSocketFactory; - private WebSocketConnection webSocket; - private WebSocketConnection unidentifiedWebSocket; - private boolean canConnect; + private WebSocketConnection webSocket; + private final BehaviorSubject webSocketState; + private CompositeDisposable webSocketStateDisposable; + + private WebSocketConnection unidentifiedWebSocket; + private final BehaviorSubject unidentifiedWebSocketState; + private CompositeDisposable unidentifiedWebSocketStateDisposable; + + private boolean canConnect; public SignalWebSocket(WebSocketFactory webSocketFactory) { - this.webSocketFactory = webSocketFactory; + this.webSocketFactory = webSocketFactory; + this.webSocketState = BehaviorSubject.createDefault(WebSocketConnectionState.DISCONNECTED); + this.unidentifiedWebSocketState = BehaviorSubject.createDefault(WebSocketConnectionState.DISCONNECTED); + this.webSocketStateDisposable = new CompositeDisposable(); + this.unidentifiedWebSocketStateDisposable = new CompositeDisposable(); } + /** + * Get an observable stream of the identified WebSocket state. This observable is valid for the lifetime of + * the instance, and will update as WebSocketConnections are remade. + */ + public Observable getWebSocketState() { + return webSocketState; + } + + /** + * Get an observable stream of the unidentified WebSocket state. This observable is valid for the lifetime of + * the instance, and will update as WebSocketConnections are remade. + */ + public Observable getUnidentifiedWebSocketState() { + return unidentifiedWebSocketState; + } + + /** + * Indicate that WebSocketConnections can now be made and attempt to connect both of them. + */ public synchronized void connect() { canConnect = true; try { @@ -47,17 +82,54 @@ public final class SignalWebSocket { } } + /** + * Indicate that WebSocketConnections can no longer be made and disconnect both of them. + */ public synchronized void disconnect() { canConnect = false; + disconnectIdentified(); + disconnectUnidentified(); + } + /** + * Indicate that the current WebSocket instances need to be destroyed and new ones should be created the + * next time a connection is required. Intended to be used by the health monitor to cycle a WebSocket. + */ + public synchronized void forceNewWebSockets() { + Log.i(TAG, "Forcing new WebSockets " + + " identified: " + (webSocket != null ? webSocket.getName() : "[null]") + + " unidentified: " + (unidentifiedWebSocket != null ? unidentifiedWebSocket.getName() : "[null]") + + " canConnect: " + canConnect); + + disconnectIdentified(); + disconnectUnidentified(); + } + + private void disconnectIdentified() { if (webSocket != null) { + webSocketStateDisposable.dispose(); + webSocket.disconnect(); webSocket = null; - } + //noinspection ConstantConditions + if (!webSocketState.getValue().isFailure()) { + webSocketState.onNext(WebSocketConnectionState.DISCONNECTED); + } + } + } + + private void disconnectUnidentified() { if (unidentifiedWebSocket != null) { + unidentifiedWebSocketStateDisposable.dispose(); + unidentifiedWebSocket.disconnect(); unidentifiedWebSocket = null; + + //noinspection ConstantConditions + if (!unidentifiedWebSocketState.getValue().isFailure()) { + unidentifiedWebSocketState.onNext(WebSocketConnectionState.DISCONNECTED); + } } } @@ -67,8 +139,16 @@ public final class SignalWebSocket { } if (webSocket == null || webSocket.isDead()) { - webSocket = webSocketFactory.createWebSocket(); - webSocket.connect(); + webSocketStateDisposable.dispose(); + + webSocket = webSocketFactory.createWebSocket(); + webSocketStateDisposable = new CompositeDisposable(); + + Disposable state = webSocket.connect() + .subscribeOn(Schedulers.computation()) + .observeOn(Schedulers.computation()) + .subscribe(webSocketState::onNext); + webSocketStateDisposable.add(state); } return webSocket; } @@ -79,12 +159,34 @@ public final class SignalWebSocket { } if (unidentifiedWebSocket == null || unidentifiedWebSocket.isDead()) { - unidentifiedWebSocket = webSocketFactory.createUnidentifiedWebSocket(); - unidentifiedWebSocket.connect(); + unidentifiedWebSocketStateDisposable.dispose(); + + unidentifiedWebSocket = webSocketFactory.createUnidentifiedWebSocket(); + unidentifiedWebSocketStateDisposable = new CompositeDisposable(); + + Disposable state = unidentifiedWebSocket.connect() + .subscribeOn(Schedulers.computation()) + .observeOn(Schedulers.computation()) + .subscribe(unidentifiedWebSocketState::onNext); + unidentifiedWebSocketStateDisposable.add(state); } return unidentifiedWebSocket; } + /** + * Send keep-alive messages over both WebSocketConnections. + */ + public synchronized void sendKeepAlive() throws IOException { + if (canConnect) { + try { + getWebSocket().sendKeepAlive(); + getUnidentifiedWebSocket().sendKeepAlive(); + } catch (WebSocketUnavailableException e) { + throw new AssertionError(e); + } + } + } + public Single request(WebSocketRequestMessage requestMessage) { try { return getWebSocket().sendRequest(requestMessage); @@ -98,20 +200,18 @@ public final class SignalWebSocket { WebSocketRequestMessage message = WebSocketRequestMessage.newBuilder(requestMessage) .addHeaders("Unidentified-Access-Key:" + Base64.encodeBytes(unidentifiedAccess.get().getUnidentifiedAccessKey())) .build(); - Single response; try { - response = getUnidentifiedWebSocket().sendRequest(message); + return getUnidentifiedWebSocket().sendRequest(message) + .flatMap(r -> { + if (r.getStatus() == 401) { + return request(requestMessage); + } + return Single.just(r); + }) + .onErrorResumeNext(t -> request(requestMessage)); } catch (IOException e) { return Single.error(e); } - - return response.flatMap(r -> { - if (r.getStatus() == 401) { - return request(requestMessage); - } - return Single.just(r); - }) - .onErrorResumeNext(t -> request(requestMessage)); } else { return request(requestMessage); } diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/websocket/ConnectivityListener.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/websocket/ConnectivityListener.java deleted file mode 100644 index bae701edb4..0000000000 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/websocket/ConnectivityListener.java +++ /dev/null @@ -1,12 +0,0 @@ -package org.whispersystems.signalservice.api.websocket; - - -import okhttp3.Response; - -public interface ConnectivityListener { - void onConnected(); - void onConnecting(); - void onDisconnected(); - void onAuthenticationFailure(); - boolean onGenericFailure(Response response, Throwable throwable); -} diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/websocket/HealthMonitor.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/websocket/HealthMonitor.java new file mode 100644 index 0000000000..692780bb27 --- /dev/null +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/websocket/HealthMonitor.java @@ -0,0 +1,10 @@ +package org.whispersystems.signalservice.api.websocket; + +/** + * Callbacks to provide WebSocket health information to a monitor. + */ +public interface HealthMonitor { + void onKeepAliveResponse(long sentTimestamp, boolean isIdentifiedWebSocket); + + void onMessageError(int status, boolean isIdentifiedWebSocket); +} diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/api/websocket/WebSocketConnectionState.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/websocket/WebSocketConnectionState.java new file mode 100644 index 0000000000..ea2addddf2 --- /dev/null +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/api/websocket/WebSocketConnectionState.java @@ -0,0 +1,18 @@ +package org.whispersystems.signalservice.api.websocket; + +/** + * Represent the state of a single WebSocketConnection. + */ +public enum WebSocketConnectionState { + DISCONNECTED, + CONNECTING, + CONNECTED, + RECONNECTING, + DISCONNECTING, + AUTHENTICATION_FAILED, + FAILED; + + public boolean isFailure() { + return this == AUTHENTICATION_FAILED || this == FAILED; + } +} diff --git a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/WebSocketConnection.java b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/WebSocketConnection.java index d8c05d22b6..e2487f3d11 100644 --- a/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/WebSocketConnection.java +++ b/libsignal/service/src/main/java/org/whispersystems/signalservice/internal/websocket/WebSocketConnection.java @@ -7,10 +7,10 @@ import org.whispersystems.libsignal.util.Pair; import org.whispersystems.libsignal.util.guava.Optional; import org.whispersystems.signalservice.api.push.TrustStore; import org.whispersystems.signalservice.api.util.CredentialsProvider; -import org.whispersystems.signalservice.api.util.SleepTimer; import org.whispersystems.signalservice.api.util.Tls12SocketFactory; import org.whispersystems.signalservice.api.util.TlsProxySocketFactory; -import org.whispersystems.signalservice.api.websocket.ConnectivityListener; +import org.whispersystems.signalservice.api.websocket.HealthMonitor; +import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState; import org.whispersystems.signalservice.internal.configuration.SignalProxy; import org.whispersystems.signalservice.internal.configuration.SignalServiceConfiguration; import org.whispersystems.signalservice.internal.util.BlacklistingTrustManager; @@ -20,21 +20,24 @@ import java.io.IOException; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocketFactory; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; +import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.Single; import io.reactivex.rxjava3.schedulers.Schedulers; +import io.reactivex.rxjava3.subjects.BehaviorSubject; import io.reactivex.rxjava3.subjects.SingleSubject; import okhttp3.ConnectionSpec; import okhttp3.Dns; @@ -53,45 +56,40 @@ import static org.whispersystems.signalservice.internal.websocket.WebSocketProto public class WebSocketConnection extends WebSocketListener { private static final String TAG = WebSocketConnection.class.getSimpleName(); - private static final int KEEPALIVE_TIMEOUT_SECONDS = 55; + public static final int KEEPALIVE_TIMEOUT_SECONDS = 55; private final LinkedList incomingRequests = new LinkedList<>(); private final Map outgoingRequests = new HashMap<>(); + private final Set keepAlives = new HashSet<>(); - private final String name; - private final String wsUri; - private final TrustStore trustStore; - private final Optional credentialsProvider; - private final String signalAgent; - private ConnectivityListener listener; - private final SleepTimer sleepTimer; - private final List interceptors; - private final Optional dns; - private final Optional signalProxy; + private final String name; + private final String wsUri; + private final TrustStore trustStore; + private final Optional credentialsProvider; + private final String signalAgent; + private final HealthMonitor healthMonitor; + private final List interceptors; + private final Optional dns; + private final Optional signalProxy; + private final BehaviorSubject webSocketState; - private WebSocket client; - private KeepAliveSender keepAliveSender; - private int attempts; - private boolean connected; + private WebSocket client; public WebSocketConnection(String name, SignalServiceConfiguration serviceConfiguration, Optional credentialsProvider, String signalAgent, - ConnectivityListener listener, - SleepTimer timer) + HealthMonitor healthMonitor) { this.name = "[" + name + ":" + System.identityHashCode(this) + "]"; this.trustStore = serviceConfiguration.getSignalServiceUrls()[0].getTrustStore(); this.credentialsProvider = credentialsProvider; this.signalAgent = signalAgent; - this.listener = listener; - this.sleepTimer = timer; this.interceptors = serviceConfiguration.getNetworkInterceptors(); this.dns = serviceConfiguration.getDns(); this.signalProxy = serviceConfiguration.getSignalProxy(); - this.attempts = 0; - this.connected = false; + this.healthMonitor = healthMonitor; + this.webSocketState = BehaviorSubject.createDefault(WebSocketConnectionState.DISCONNECTED); String uri = serviceConfiguration.getSignalServiceUrls()[0].getUrl().replace("https://", "wss://").replace("http://", "ws://"); @@ -102,7 +100,11 @@ public class WebSocketConnection extends WebSocketListener { } } - public synchronized void connect() { + public String getName() { + return name; + } + + public synchronized Observable connect() { log("connect()"); if (client == null) { @@ -117,12 +119,12 @@ public class WebSocketConnection extends WebSocketListener { Pair socketFactory = createTlsSocketFactory(trustStore); - OkHttpClient.Builder clientBuilder = new OkHttpClient.Builder() - .sslSocketFactory(new Tls12SocketFactory(socketFactory.first()), socketFactory.second()) - .connectionSpecs(Util.immutableList(ConnectionSpec.RESTRICTED_TLS)) - .readTimeout(KEEPALIVE_TIMEOUT_SECONDS + 10, TimeUnit.SECONDS) - .dns(dns.or(Dns.SYSTEM)) - .connectTimeout(KEEPALIVE_TIMEOUT_SECONDS + 10, TimeUnit.SECONDS); + OkHttpClient.Builder clientBuilder = new OkHttpClient.Builder().sslSocketFactory(new Tls12SocketFactory(socketFactory.first()), + socketFactory.second()) + .connectionSpecs(Util.immutableList(ConnectionSpec.RESTRICTED_TLS)) + .readTimeout(KEEPALIVE_TIMEOUT_SECONDS + 10, TimeUnit.SECONDS) + .dns(dns.or(Dns.SYSTEM)) + .connectTimeout(KEEPALIVE_TIMEOUT_SECONDS + 10, TimeUnit.SECONDS); for (Interceptor interceptor : interceptors) { clientBuilder.addInterceptor(interceptor); @@ -140,13 +142,11 @@ public class WebSocketConnection extends WebSocketListener { requestBuilder.addHeader("X-Signal-Agent", signalAgent); } - if (listener != null) { - listener.onConnecting(); - } + webSocketState.onNext(WebSocketConnectionState.CONNECTING); - this.connected = false; - this.client = okHttpClient.newWebSocket(requestBuilder.build(), this); + this.client = okHttpClient.newWebSocket(requestBuilder.build(), this); } + return webSocketState; } public synchronized boolean isDead() { @@ -158,18 +158,8 @@ public class WebSocketConnection extends WebSocketListener { if (client != null) { client.close(1000, "OK"); - client = null; - connected = false; - } - - if (keepAliveSender != null) { - keepAliveSender.shutdown(); - keepAliveSender = null; - } - - if (listener != null) { - listener.onDisconnected(); - listener = null; + client = null; + webSocketState.onNext(WebSocketConnectionState.DISCONNECTING); } notifyAll(); @@ -198,7 +188,7 @@ public class WebSocketConnection extends WebSocketListener { } public synchronized Single sendRequest(WebSocketRequestMessage request) throws IOException { - if (client == null || !connected) { + if (client == null) { throw new IOException("No connection!"); } @@ -235,17 +225,20 @@ public class WebSocketConnection extends WebSocketListener { } } - private synchronized void sendKeepAlive() throws IOException { - if (keepAliveSender != null && client != null) { + public synchronized void sendKeepAlive() throws IOException { + if (client != null) { + log( "Sending keep alive..."); + long id = System.currentTimeMillis(); byte[] message = WebSocketMessage.newBuilder() .setType(WebSocketMessage.Type.REQUEST) .setRequest(WebSocketRequestMessage.newBuilder() - .setId(System.currentTimeMillis()) + .setId(id) .setPath("/v1/keepalive") .setVerb("GET") - .build()).build() + .build()) + .build() .toByteArray(); - + keepAlives.add(id); if (!client.send(ByteString.of(message))) { throw new IOException("Write failed!"); } @@ -254,16 +247,9 @@ public class WebSocketConnection extends WebSocketListener { @Override public synchronized void onOpen(WebSocket webSocket, Response response) { - if (client != null && keepAliveSender == null) { + if (client != null) { log("onOpen() connected"); - attempts = 0; - connected = true; - keepAliveSender = new KeepAliveSender(); - keepAliveSender.start(); - - if (listener != null) { - listener.onConnected(); - } + webSocketState.onNext(WebSocketConnectionState.CONNECTED); } } @@ -280,6 +266,11 @@ public class WebSocketConnection extends WebSocketListener { listener.onSuccess(new WebsocketResponse(message.getResponse().getStatus(), new String(message.getResponse().getBody().toByteArray()), message.getResponse().getHeadersList())); + if (message.getResponse().getStatus() >= 400) { + healthMonitor.onMessageError(message.getResponse().getStatus(), credentialsProvider.isPresent()); + } + } else if (keepAlives.remove(message.getResponse().getId())) { + healthMonitor.onKeepAliveResponse(message.getResponse().getId(), credentialsProvider.isPresent()); } } @@ -292,34 +283,9 @@ public class WebSocketConnection extends WebSocketListener { @Override public synchronized void onClosed(WebSocket webSocket, int code, String reason) { log("onClose()"); - this.connected = false; + webSocketState.onNext(WebSocketConnectionState.DISCONNECTED); - Iterator> iterator = outgoingRequests.entrySet().iterator(); - - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - entry.getValue().onError(new IOException("Closed: " + code + ", " + reason)); - iterator.remove(); - } - - if (keepAliveSender != null) { - keepAliveSender.shutdown(); - keepAliveSender = null; - } - - if (listener != null) { - listener.onDisconnected(); - } - - Util.wait(this, Math.min(++attempts * 200, TimeUnit.SECONDS.toMillis(15))); - - if (client != null) { - log("Client not null when closed, attempting to reconnect"); - client.close(1000, "OK"); - client = null; - connected = false; - connect(); - } + cleanupAfterShutdown(); notifyAll(); } @@ -329,19 +295,29 @@ public class WebSocketConnection extends WebSocketListener { warn("onFailure()", t); if (response != null && (response.code() == 401 || response.code() == 403)) { - if (listener != null) { - listener.onAuthenticationFailure(); - } - } else if (listener != null) { - boolean shouldRetryConnection = listener.onGenericFailure(response, t); - if (!shouldRetryConnection) { - warn("Experienced a failure, and the listener indicated we should not retry the connection. Disconnecting."); - disconnect(); - } + webSocketState.onNext(WebSocketConnectionState.AUTHENTICATION_FAILED); + } else { + webSocketState.onNext(WebSocketConnectionState.FAILED); + } + + cleanupAfterShutdown(); + + notifyAll(); + } + + private void cleanupAfterShutdown() { + Iterator> iterator = outgoingRequests.entrySet().iterator(); + + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + entry.getValue().onError(new IOException("Closed unexpectedly")); + iterator.remove(); } if (client != null) { - onClosed(webSocket, 1000, "OK"); + log("Client not null when closed"); + client.close(1000, "OK"); + client = null; } } @@ -353,6 +329,7 @@ public class WebSocketConnection extends WebSocketListener { @Override public synchronized void onClosing(WebSocket webSocket, int code, String reason) { log("onClosing()"); + webSocketState.onNext(WebSocketConnectionState.DISCONNECTING); webSocket.close(1000, "OK"); } @@ -390,30 +367,6 @@ public class WebSocketConnection extends WebSocketListener { Log.w(TAG, name + " " + message, e); } - private class KeepAliveSender extends Thread { - - private final AtomicBoolean stop = new AtomicBoolean(false); - - public void run() { - while (!stop.get()) { - try { - sleepTimer.sleep(TimeUnit.SECONDS.toMillis(KEEPALIVE_TIMEOUT_SECONDS)); - - if (!stop.get()) { - log("Sending keep alive..."); - sendKeepAlive(); - } - } catch (Throwable e) { - warn(e); - } - } - } - - public void shutdown() { - stop.set(true); - } - } - private static class OutgoingRequest { private final SingleSubject responseSingle;