diff --git a/app/src/main/java/org/thoughtcrime/securesms/net/SignalWebSocketHealthMonitor.java b/app/src/main/java/org/thoughtcrime/securesms/net/SignalWebSocketHealthMonitor.java index 6b829d03bc..3c26868629 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/net/SignalWebSocketHealthMonitor.java +++ b/app/src/main/java/org/thoughtcrime/securesms/net/SignalWebSocketHealthMonitor.java @@ -5,6 +5,7 @@ import android.app.Application; import androidx.annotation.NonNull; import org.greenrobot.eventbus.EventBus; +import org.signal.core.util.ThreadUtil; import org.signal.core.util.logging.Log; import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; import org.thoughtcrime.securesms.events.ReminderUpdateEvent; @@ -17,6 +18,8 @@ import org.whispersystems.signalservice.api.websocket.HealthMonitor; import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState; import org.whispersystems.signalservice.internal.websocket.WebSocketConnection; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import io.reactivex.rxjava3.schedulers.Schedulers; @@ -35,11 +38,13 @@ public final class SignalWebSocketHealthMonitor implements HealthMonitor { private static final long KEEP_ALIVE_SEND_CADENCE = TimeUnit.SECONDS.toMillis(WebSocketConnection.KEEPALIVE_TIMEOUT_SECONDS); private static final long MAX_TIME_SINCE_SUCCESSFUL_KEEP_ALIVE = KEEP_ALIVE_SEND_CADENCE * 3; + private final Executor executor = ThreadUtil.trace(Executors.newSingleThreadExecutor()); + private final Application context; private SignalWebSocket signalWebSocket; private final SleepTimer sleepTimer; - private volatile KeepAliveSender keepAliveSender; + private KeepAliveSender keepAliveSender; private final HealthState identified = new HealthState(); private final HealthState unidentified = new HealthState(); @@ -50,72 +55,80 @@ public final class SignalWebSocketHealthMonitor implements HealthMonitor { } public void monitor(@NonNull SignalWebSocket signalWebSocket) { - Preconditions.checkNotNull(signalWebSocket); - Preconditions.checkArgument(this.signalWebSocket == null, "monitor can only be called once"); + executor.execute(() -> { + Preconditions.checkNotNull(signalWebSocket); + Preconditions.checkArgument(this.signalWebSocket == null, "monitor can only be called once"); - this.signalWebSocket = signalWebSocket; + this.signalWebSocket = signalWebSocket; - //noinspection ResultOfMethodCallIgnored - signalWebSocket.getWebSocketState() - .subscribeOn(Schedulers.computation()) - .observeOn(Schedulers.computation()) - .distinctUntilChanged() - .subscribe(s -> onStateChange(s, identified)); + //noinspection ResultOfMethodCallIgnored + signalWebSocket.getWebSocketState() + .subscribeOn(Schedulers.computation()) + .observeOn(Schedulers.computation()) + .distinctUntilChanged() + .subscribe(s -> onStateChange(s, identified)); - //noinspection ResultOfMethodCallIgnored - signalWebSocket.getUnidentifiedWebSocketState() - .subscribeOn(Schedulers.computation()) - .observeOn(Schedulers.computation()) - .distinctUntilChanged() - .subscribe(s -> onStateChange(s, unidentified)); + //noinspection ResultOfMethodCallIgnored + signalWebSocket.getUnidentifiedWebSocketState() + .subscribeOn(Schedulers.computation()) + .observeOn(Schedulers.computation()) + .distinctUntilChanged() + .subscribe(s -> onStateChange(s, unidentified)); + }); } - private synchronized void onStateChange(WebSocketConnectionState connectionState, HealthState healthState) { - switch (connectionState) { - case CONNECTED: - TextSecurePreferences.setUnauthorizedReceived(context, false); - break; - case AUTHENTICATION_FAILED: - TextSecurePreferences.setUnauthorizedReceived(context, true); - EventBus.getDefault().post(new ReminderUpdateEvent()); - break; - case FAILED: - if (SignalStore.proxy().isProxyEnabled()) { - Log.w(TAG, "Encountered an error while we had a proxy set! Terminating the connection to prevent retry spam."); - ApplicationDependencies.closeConnections(); - } - break; - } + private void onStateChange(WebSocketConnectionState connectionState, HealthState healthState) { + executor.execute(() -> { + switch (connectionState) { + case CONNECTED: + TextSecurePreferences.setUnauthorizedReceived(context, false); + break; + case AUTHENTICATION_FAILED: + TextSecurePreferences.setUnauthorizedReceived(context, true); + EventBus.getDefault().post(new ReminderUpdateEvent()); + break; + case FAILED: + if (SignalStore.proxy().isProxyEnabled()) { + Log.w(TAG, "Encountered an error while we had a proxy set! Terminating the connection to prevent retry spam."); + ApplicationDependencies.closeConnections(); + } + break; + } - healthState.needsKeepAlive = connectionState == WebSocketConnectionState.CONNECTED; + healthState.needsKeepAlive = connectionState == WebSocketConnectionState.CONNECTED; - if (keepAliveSender == null && isKeepAliveNecessary()) { - keepAliveSender = new KeepAliveSender(); - keepAliveSender.start(); - } else if (keepAliveSender != null && !isKeepAliveNecessary()) { - keepAliveSender.shutdown(); - keepAliveSender = null; - } + if (keepAliveSender == null && isKeepAliveNecessary()) { + keepAliveSender = new KeepAliveSender(); + keepAliveSender.start(); + } else if (keepAliveSender != null && !isKeepAliveNecessary()) { + keepAliveSender.shutdown(); + keepAliveSender = null; + } + }); } @Override public void onKeepAliveResponse(long sentTimestamp, boolean isIdentifiedWebSocket) { - if (isIdentifiedWebSocket) { - identified.lastKeepAliveReceived = System.currentTimeMillis(); - } else { - unidentified.lastKeepAliveReceived = System.currentTimeMillis(); - } + executor.execute(() -> { + if (isIdentifiedWebSocket) { + identified.lastKeepAliveReceived = System.currentTimeMillis(); + } else { + unidentified.lastKeepAliveReceived = System.currentTimeMillis(); + } + }); } @Override public void onMessageError(int status, boolean isIdentifiedWebSocket) { - if (status == 409) { - HealthState healthState = (isIdentifiedWebSocket ? identified : unidentified); - if (healthState.mismatchErrorTracker.addSample(System.currentTimeMillis())) { - Log.w(TAG, "Received too many mismatch device errors, forcing new websockets."); - signalWebSocket.forceNewWebSockets(); + executor.execute(() -> { + if (status == 409) { + HealthState healthState = (isIdentifiedWebSocket ? identified : unidentified); + if (healthState.mismatchErrorTracker.addSample(System.currentTimeMillis())) { + Log.w(TAG, "Received too many mismatch device errors, forcing new websockets."); + signalWebSocket.forceNewWebSockets(); + } } - } + }); } private boolean isKeepAliveNecessary() {