Separate and kotlinize websockets.

This commit is contained in:
Cody Henthorne
2025-03-05 15:33:15 -05:00
committed by Michelle Tang
parent 6c9acf4657
commit 93d18c1763
26 changed files with 662 additions and 679 deletions

View File

@@ -259,6 +259,7 @@ public class ApplicationContext extends Application implements AppForegroundObse
checkFreeDiskSpace();
MemoryTracker.start();
BackupSubscriptionCheckJob.enqueueIfAble();
AppDependencies.getUnauthWebSocket().setShouldSendKeepAlives(true);
long lastForegroundTime = SignalStore.misc().getLastForegroundTime();
long currentTime = System.currentTimeMillis();
@@ -282,6 +283,7 @@ public class ApplicationContext extends Application implements AppForegroundObse
AppDependencies.getFrameRateTracker().stop();
AppDependencies.getShakeToReport().disable();
AppDependencies.getDeadlockDetector().stop();
AppDependencies.getUnauthWebSocket().setShouldSendKeepAlives(false);
MemoryTracker.stop();
AnrDetector.stop();
}
@@ -378,7 +380,7 @@ public class ApplicationContext extends Application implements AppForegroundObse
}
public void initializeMessageRetrieval() {
AppDependencies.getIncomingMessageObserver();
AppDependencies.startNetwork();
}
@VisibleForTesting

View File

@@ -190,7 +190,7 @@ class ChangeNumberRepository(
StorageSyncHelper.scheduleSyncForDataChange()
AppDependencies.resetNetwork()
AppDependencies.incomingMessageObserver
AppDependencies.startNetwork()
AppDependencies.jobManager.add(RefreshAttributesJob())

View File

@@ -93,7 +93,7 @@ class AdvancedPrivacySettingsViewModel(
val isCountryCodeCensoredByDefault: Boolean = AppDependencies.signalServiceNetworkAccess.isCountryCodeCensoredByDefault(countryCode)
val enabledState: SettingsValues.CensorshipCircumventionEnabled = SignalStore.settings.censorshipCircumventionEnabled
val hasInternet: Boolean = NetworkConstraint.isMet(AppDependencies.application)
val websocketConnected: Boolean = AppDependencies.signalWebSocket.webSocketState.firstOrError().blockingGet() == WebSocketConnectionState.CONNECTED
val websocketConnected: Boolean = AppDependencies.authWebSocket.state.firstOrError().blockingGet() == WebSocketConnectionState.CONNECTED
return when {
SignalStore.internal.allowChangingCensorshipSetting -> {

View File

@@ -42,7 +42,6 @@ import org.whispersystems.signalservice.api.SignalServiceAccountManager
import org.whispersystems.signalservice.api.SignalServiceDataStore
import org.whispersystems.signalservice.api.SignalServiceMessageReceiver
import org.whispersystems.signalservice.api.SignalServiceMessageSender
import org.whispersystems.signalservice.api.SignalWebSocket
import org.whispersystems.signalservice.api.archive.ArchiveApi
import org.whispersystems.signalservice.api.attachment.AttachmentApi
import org.whispersystems.signalservice.api.groupsv2.GroupsV2Operations
@@ -53,6 +52,7 @@ import org.whispersystems.signalservice.api.services.CallLinksService
import org.whispersystems.signalservice.api.services.DonationsService
import org.whispersystems.signalservice.api.services.ProfileService
import org.whispersystems.signalservice.api.storage.StorageServiceApi
import org.whispersystems.signalservice.api.websocket.SignalWebSocket
import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState
import org.whispersystems.signalservice.internal.configuration.SignalServiceConfiguration
import org.whispersystems.signalservice.internal.push.PushServiceSocket
@@ -214,7 +214,7 @@ object AppDependencies {
/**
* An observable that emits the current state of the WebSocket connection across the various lifecycles
* of the [signalWebSocket].
* of the [authWebSocket].
*/
@JvmStatic
val webSocketObserver: LatestValueObservable<WebSocketConnectionState> = LatestValueObservable(_webSocketObserver)
@@ -253,8 +253,12 @@ object AppDependencies {
get() = networkModule.libsignalNetwork
@JvmStatic
val signalWebSocket: SignalWebSocket
get() = networkModule.signalWebSocket
val authWebSocket: SignalWebSocket.AuthenticatedWebSocket
get() = networkModule.authWebSocket
@JvmStatic
val unauthWebSocket: SignalWebSocket.UnauthenticatedWebSocket
get() = networkModule.unauthWebSocket
@JvmStatic
val groupsV2Authorization: GroupsV2Authorization
@@ -326,11 +330,16 @@ object AppDependencies {
_networkModule.reset()
}
@JvmStatic
fun startNetwork() {
networkModule.openConnections()
}
interface Provider {
fun providePushServiceSocket(signalServiceConfiguration: SignalServiceConfiguration, groupsV2Operations: GroupsV2Operations): PushServiceSocket
fun provideGroupsV2Operations(signalServiceConfiguration: SignalServiceConfiguration): GroupsV2Operations
fun provideSignalServiceAccountManager(pushServiceSocket: PushServiceSocket, groupsV2Operations: GroupsV2Operations): SignalServiceAccountManager
fun provideSignalServiceMessageSender(signalWebSocket: SignalWebSocket, protocolStore: SignalServiceDataStore, pushServiceSocket: PushServiceSocket): SignalServiceMessageSender
fun provideSignalServiceMessageSender(authWebSocket: SignalWebSocket.AuthenticatedWebSocket, unauthWebSocket: SignalWebSocket.UnauthenticatedWebSocket, protocolStore: SignalServiceDataStore, pushServiceSocket: PushServiceSocket): SignalServiceMessageSender
fun provideSignalServiceMessageReceiver(pushServiceSocket: PushServiceSocket): SignalServiceMessageReceiver
fun provideSignalServiceNetworkAccess(): SignalServiceNetworkAccess
fun provideRecipientCache(): LiveRecipientCache
@@ -339,7 +348,7 @@ object AppDependencies {
fun provideMegaphoneRepository(): MegaphoneRepository
fun provideEarlyMessageCache(): EarlyMessageCache
fun provideMessageNotifier(): MessageNotifier
fun provideIncomingMessageObserver(signalWebSocket: SignalWebSocket): IncomingMessageObserver
fun provideIncomingMessageObserver(webSocket: SignalWebSocket.AuthenticatedWebSocket): IncomingMessageObserver
fun provideTrimThreadsByDateManager(): TrimThreadsByDateManager
fun provideViewOnceMessageManager(): ViewOnceMessageManager
fun provideExpiringStoriesManager(): ExpiringStoriesManager
@@ -353,14 +362,13 @@ object AppDependencies {
fun provideSignalCallManager(): SignalCallManager
fun providePendingRetryReceiptManager(): PendingRetryReceiptManager
fun providePendingRetryReceiptCache(): PendingRetryReceiptCache
fun provideSignalWebSocket(signalServiceConfigurationSupplier: Supplier<SignalServiceConfiguration>, libSignalNetworkSupplier: Supplier<Network>): SignalWebSocket
fun provideProtocolStore(): SignalServiceDataStoreImpl
fun provideGiphyMp4Cache(): GiphyMp4Cache
fun provideExoPlayerPool(): SimpleExoPlayerPool
fun provideAndroidCallAudioManager(): AudioManagerCompat
fun provideDonationsService(pushServiceSocket: PushServiceSocket): DonationsService
fun provideCallLinksService(pushServiceSocket: PushServiceSocket): CallLinksService
fun provideProfileService(profileOperations: ClientZkProfileOperations, signalServiceMessageReceiver: SignalServiceMessageReceiver, signalWebSocket: SignalWebSocket): ProfileService
fun provideProfileService(profileOperations: ClientZkProfileOperations, signalServiceMessageReceiver: SignalServiceMessageReceiver, authWebSocket: SignalWebSocket.AuthenticatedWebSocket, unauthWebSocket: SignalWebSocket.UnauthenticatedWebSocket): ProfileService
fun provideDeadlockDetector(): DeadlockDetector
fun provideClientZkReceiptOperations(signalServiceConfiguration: SignalServiceConfiguration): ClientZkReceiptOperations
fun provideScheduledMessageManager(): ScheduledMessageManager
@@ -368,9 +376,11 @@ object AppDependencies {
fun provideBillingApi(): BillingApi
fun provideArchiveApi(pushServiceSocket: PushServiceSocket): ArchiveApi
fun provideKeysApi(pushServiceSocket: PushServiceSocket): KeysApi
fun provideAttachmentApi(signalWebSocket: SignalWebSocket, pushServiceSocket: PushServiceSocket): AttachmentApi
fun provideAttachmentApi(authWebSocket: SignalWebSocket.AuthenticatedWebSocket, pushServiceSocket: PushServiceSocket): AttachmentApi
fun provideLinkDeviceApi(pushServiceSocket: PushServiceSocket): LinkDeviceApi
fun provideRegistrationApi(pushServiceSocket: PushServiceSocket): RegistrationApi
fun provideStorageServiceApi(pushServiceSocket: PushServiceSocket): StorageServiceApi
fun provideAuthWebSocket(signalServiceConfigurationSupplier: Supplier<SignalServiceConfiguration>, libSignalNetworkSupplier: Supplier<Network>): SignalWebSocket.AuthenticatedWebSocket
fun provideUnauthWebSocket(signalServiceConfigurationSupplier: Supplier<SignalServiceConfiguration>, libSignalNetworkSupplier: Supplier<Network>): SignalWebSocket.UnauthenticatedWebSocket
}
}

View File

@@ -81,7 +81,6 @@ import org.whispersystems.signalservice.api.SignalServiceAccountManager;
import org.whispersystems.signalservice.api.SignalServiceDataStore;
import org.whispersystems.signalservice.api.SignalServiceMessageReceiver;
import org.whispersystems.signalservice.api.SignalServiceMessageSender;
import org.whispersystems.signalservice.api.SignalWebSocket;
import org.whispersystems.signalservice.api.archive.ArchiveApi;
import org.whispersystems.signalservice.api.attachment.AttachmentApi;
import org.whispersystems.signalservice.api.groupsv2.ClientZkOperations;
@@ -98,6 +97,8 @@ import org.whispersystems.signalservice.api.storage.StorageServiceApi;
import org.whispersystems.signalservice.api.util.CredentialsProvider;
import org.whispersystems.signalservice.api.util.SleepTimer;
import org.whispersystems.signalservice.api.util.UptimeSleepTimer;
import org.whispersystems.signalservice.api.websocket.HealthMonitor;
import org.whispersystems.signalservice.api.websocket.SignalWebSocket;
import org.whispersystems.signalservice.api.websocket.WebSocketFactory;
import org.whispersystems.signalservice.internal.configuration.SignalServiceConfiguration;
import org.whispersystems.signalservice.internal.push.PushServiceSocket;
@@ -147,11 +148,12 @@ public class ApplicationDependencyProvider implements AppDependencies.Provider {
}
@Override
public @NonNull SignalServiceMessageSender provideSignalServiceMessageSender(@NonNull SignalWebSocket signalWebSocket, @NonNull SignalServiceDataStore protocolStore, @NonNull PushServiceSocket pushServiceSocket) {
public @NonNull SignalServiceMessageSender provideSignalServiceMessageSender(@NonNull SignalWebSocket.AuthenticatedWebSocket authWebSocket, @NonNull SignalWebSocket.UnauthenticatedWebSocket unauthWebSocket, @NonNull SignalServiceDataStore protocolStore, @NonNull PushServiceSocket pushServiceSocket) {
return new SignalServiceMessageSender(pushServiceSocket,
protocolStore,
ReentrantSessionLock.INSTANCE,
signalWebSocket,
authWebSocket,
unauthWebSocket,
Optional.of(new SecurityEventListener(context)),
SignalExecutors.newCachedBoundedExecutor("signal-messages", ThreadUtil.PRIORITY_IMPORTANT_BACKGROUND_THREAD, 1, 16, 30),
ByteUnit.KILOBYTES.toBytes(256));
@@ -207,8 +209,8 @@ public class ApplicationDependencyProvider implements AppDependencies.Provider {
}
@Override
public @NonNull IncomingMessageObserver provideIncomingMessageObserver(@NonNull SignalWebSocket signalWebSocket) {
return new IncomingMessageObserver(context, signalWebSocket);
public @NonNull IncomingMessageObserver provideIncomingMessageObserver(@NonNull SignalWebSocket.AuthenticatedWebSocket webSocket) {
return new IncomingMessageObserver(context, webSocket);
}
@Override
@@ -297,15 +299,29 @@ public class ApplicationDependencyProvider implements AppDependencies.Provider {
}
@Override
public @NonNull SignalWebSocket provideSignalWebSocket(@NonNull Supplier<SignalServiceConfiguration> signalServiceConfigurationSupplier, @NonNull Supplier<Network> libSignalNetworkSupplier) {
SleepTimer sleepTimer = !SignalStore.account().isFcmEnabled() || SignalStore.internal().isWebsocketModeForced() ? new AlarmSleepTimer(context) : new UptimeSleepTimer();
SignalWebSocketHealthMonitor healthMonitor = new SignalWebSocketHealthMonitor(context, sleepTimer);
WebSocketShadowingBridge bridge = new DefaultWebSocketShadowingBridge(context);
SignalWebSocket signalWebSocket = new SignalWebSocket(provideWebSocketFactory(signalServiceConfigurationSupplier, healthMonitor, libSignalNetworkSupplier, bridge));
public @NonNull SignalWebSocket.AuthenticatedWebSocket provideAuthWebSocket(@NonNull Supplier<SignalServiceConfiguration> signalServiceConfigurationSupplier, @NonNull Supplier<Network> libSignalNetworkSupplier) {
SleepTimer sleepTimer = !SignalStore.account().isFcmEnabled() || SignalStore.internal().isWebsocketModeForced() ? new AlarmSleepTimer(context) : new UptimeSleepTimer();
SignalWebSocketHealthMonitor healthMonitor = new SignalWebSocketHealthMonitor(sleepTimer);
WebSocketShadowingBridge bridge = new DefaultWebSocketShadowingBridge(context);
WebSocketFactory webSocketFactory = provideWebSocketFactory(signalServiceConfigurationSupplier, healthMonitor, libSignalNetworkSupplier, bridge);
SignalWebSocket.AuthenticatedWebSocket webSocket = new SignalWebSocket.AuthenticatedWebSocket(webSocketFactory::createWebSocket);
healthMonitor.monitor(signalWebSocket);
healthMonitor.monitor(webSocket);
return signalWebSocket;
return webSocket;
}
@Override
public @NonNull SignalWebSocket.UnauthenticatedWebSocket provideUnauthWebSocket(@NonNull Supplier<SignalServiceConfiguration> signalServiceConfigurationSupplier, @NonNull Supplier<Network> libSignalNetworkSupplier) {
SleepTimer sleepTimer = !SignalStore.account().isFcmEnabled() || SignalStore.internal().isWebsocketModeForced() ? new AlarmSleepTimer(context) : new UptimeSleepTimer();
SignalWebSocketHealthMonitor healthMonitor = new SignalWebSocketHealthMonitor(sleepTimer);
WebSocketShadowingBridge bridge = new DefaultWebSocketShadowingBridge(context);
WebSocketFactory webSocketFactory = provideWebSocketFactory(signalServiceConfigurationSupplier, healthMonitor, libSignalNetworkSupplier, bridge);
SignalWebSocket.UnauthenticatedWebSocket webSocket = new SignalWebSocket.UnauthenticatedWebSocket(webSocketFactory::createUnidentifiedWebSocket);
healthMonitor.monitor(webSocket);
return webSocket;
}
@Override
@@ -383,9 +399,10 @@ public class ApplicationDependencyProvider implements AppDependencies.Provider {
@Override
public @NonNull ProfileService provideProfileService(@NonNull ClientZkProfileOperations clientZkProfileOperations,
@NonNull SignalServiceMessageReceiver receiver,
@NonNull SignalWebSocket signalWebSocket)
@NonNull SignalWebSocket.AuthenticatedWebSocket authWebSocket,
@NonNull SignalWebSocket.UnauthenticatedWebSocket unauthWebSocket)
{
return new ProfileService(clientZkProfileOperations, receiver, signalWebSocket);
return new ProfileService(clientZkProfileOperations, receiver, authWebSocket, unauthWebSocket);
}
@Override
@@ -401,7 +418,7 @@ public class ApplicationDependencyProvider implements AppDependencies.Provider {
}
@NonNull WebSocketFactory provideWebSocketFactory(@NonNull Supplier<SignalServiceConfiguration> signalServiceConfigurationSupplier,
@NonNull SignalWebSocketHealthMonitor healthMonitor,
@NonNull HealthMonitor healthMonitor,
@NonNull Supplier<Network> libSignalNetworkSupplier,
@NonNull WebSocketShadowingBridge bridge)
{
@@ -479,8 +496,8 @@ public class ApplicationDependencyProvider implements AppDependencies.Provider {
}
@Override
public @NonNull AttachmentApi provideAttachmentApi(@NonNull SignalWebSocket signalWebSocket, @NonNull PushServiceSocket pushServiceSocket) {
return new AttachmentApi(signalWebSocket, pushServiceSocket);
public @NonNull AttachmentApi provideAttachmentApi(@NonNull SignalWebSocket.AuthenticatedWebSocket authWebSocket, @NonNull PushServiceSocket pushServiceSocket) {
return new AttachmentApi(authWebSocket, pushServiceSocket);
}
@Override

View File

@@ -26,7 +26,6 @@ import org.thoughtcrime.securesms.push.SignalServiceTrustStore
import org.whispersystems.signalservice.api.SignalServiceAccountManager
import org.whispersystems.signalservice.api.SignalServiceMessageReceiver
import org.whispersystems.signalservice.api.SignalServiceMessageSender
import org.whispersystems.signalservice.api.SignalWebSocket
import org.whispersystems.signalservice.api.archive.ArchiveApi
import org.whispersystems.signalservice.api.attachment.AttachmentApi
import org.whispersystems.signalservice.api.groupsv2.GroupsV2Operations
@@ -39,6 +38,7 @@ import org.whispersystems.signalservice.api.services.DonationsService
import org.whispersystems.signalservice.api.services.ProfileService
import org.whispersystems.signalservice.api.storage.StorageServiceApi
import org.whispersystems.signalservice.api.util.Tls12SocketFactory
import org.whispersystems.signalservice.api.websocket.SignalWebSocket
import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState
import org.whispersystems.signalservice.internal.push.PushServiceSocket
import org.whispersystems.signalservice.internal.util.BlacklistingTrustManager
@@ -70,12 +70,12 @@ class NetworkDependenciesModule(
val protocolStore: SignalServiceDataStoreImpl by _protocolStore
private val _signalServiceMessageSender = resettableLazy {
provider.provideSignalServiceMessageSender(signalWebSocket, protocolStore, pushServiceSocket)
provider.provideSignalServiceMessageSender(authWebSocket, unauthWebSocket, protocolStore, pushServiceSocket)
}
val signalServiceMessageSender: SignalServiceMessageSender by _signalServiceMessageSender
val incomingMessageObserver: IncomingMessageObserver by lazy {
provider.provideIncomingMessageObserver(signalWebSocket)
provider.provideIncomingMessageObserver(authWebSocket)
}
val pushServiceSocket: PushServiceSocket by lazy {
@@ -90,12 +90,16 @@ class NetworkDependenciesModule(
provider.provideLibsignalNetwork(signalServiceNetworkAccess.getConfiguration())
}
val signalWebSocket: SignalWebSocket by lazy {
provider.provideSignalWebSocket({ signalServiceNetworkAccess.getConfiguration() }, { libsignalNetwork }).also {
disposables += it.webSocketState.subscribe { webSocketStateSubject.onNext(it) }
val authWebSocket: SignalWebSocket.AuthenticatedWebSocket by lazy {
provider.provideAuthWebSocket({ signalServiceNetworkAccess.getConfiguration() }, { libsignalNetwork }).also {
disposables += it.state.subscribe { s -> webSocketStateSubject.onNext(s) }
}
}
val unauthWebSocket: SignalWebSocket.UnauthenticatedWebSocket by lazy {
provider.provideUnauthWebSocket({ signalServiceNetworkAccess.getConfiguration() }, { libsignalNetwork })
}
val groupsV2Authorization: GroupsV2Authorization by lazy {
val authCache: GroupsV2Authorization.ValueCache = GroupsV2AuthorizationMemoryValueCache(SignalStore.groupsV2AciAuthorizationCache)
GroupsV2Authorization(signalServiceAccountManager.groupsV2Api, authCache)
@@ -122,7 +126,7 @@ class NetworkDependenciesModule(
}
val profileService: ProfileService by lazy {
provider.provideProfileService(groupsV2Operations.profileOperations, signalServiceMessageReceiver, signalWebSocket)
provider.provideProfileService(groupsV2Operations.profileOperations, signalServiceMessageReceiver, authWebSocket, unauthWebSocket)
}
val donationsService: DonationsService by lazy {
@@ -138,7 +142,7 @@ class NetworkDependenciesModule(
}
val attachmentApi: AttachmentApi by lazy {
provider.provideAttachmentApi(signalWebSocket, pushServiceSocket)
provider.provideAttachmentApi(authWebSocket, pushServiceSocket)
}
val linkDeviceApi: LinkDeviceApi by lazy {
@@ -185,9 +189,15 @@ class NetworkDependenciesModule(
if (_signalServiceMessageSender.isInitialized()) {
signalServiceMessageSender.cancelInFlightRequests()
}
unauthWebSocket.disconnect()
disposables.clear()
}
fun openConnections() {
incomingMessageObserver
unauthWebSocket.connect()
}
fun resetProtocolStores() {
_protocolStore.reset()
_signalServiceMessageSender.reset()

View File

@@ -196,6 +196,7 @@ class AttachmentUploadJob private constructor(
if (lastReset > now || lastReset + NETWORK_RESET_THRESHOLD > now) {
Log.w(TAG, "Our existing connections is getting repeatedly denied by the server, reset network to establish new connections")
AppDependencies.resetNetwork()
AppDependencies.startNetwork()
SignalStore.misc.lastNetworkResetDueToStreamResets = now
} else {
Log.i(TAG, "Stream reset during upload, not resetting network yet, last reset: $lastReset")

View File

@@ -33,10 +33,10 @@ import org.thoughtcrime.securesms.util.AppForegroundObserver
import org.thoughtcrime.securesms.util.RemoteConfig
import org.thoughtcrime.securesms.util.SignalLocalMetrics
import org.thoughtcrime.securesms.util.asChain
import org.whispersystems.signalservice.api.SignalWebSocket
import org.whispersystems.signalservice.api.push.ServiceId
import org.whispersystems.signalservice.api.util.SleepTimer
import org.whispersystems.signalservice.api.util.UptimeSleepTimer
import org.whispersystems.signalservice.api.websocket.SignalWebSocket
import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState
import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException
import org.whispersystems.signalservice.internal.push.Envelope
@@ -54,10 +54,9 @@ import kotlin.time.Duration.Companion.seconds
/**
* The application-level manager of our websocket connection.
*
*
* This class is responsible for opening/closing the websocket based on the app's state and observing new inbound messages received on the websocket.
*/
class IncomingMessageObserver(private val context: Application, private val signalWebSocket: SignalWebSocket) {
class IncomingMessageObserver(private val context: Application, private val authWebSocket: SignalWebSocket.AuthenticatedWebSocket) {
companion object {
private val TAG = Log.tag(IncomingMessageObserver::class.java)
@@ -244,7 +243,7 @@ class IncomingMessageObserver(private val context: Application, private val sign
}
private fun disconnect() {
signalWebSocket.disconnect()
authWebSocket.disconnect()
}
@JvmOverloads
@@ -384,7 +383,7 @@ class IncomingMessageObserver(private val context: Application, private val sign
waitForConnectionNecessary()
Log.i(TAG, "Making websocket connection....")
val webSocketDisposable = signalWebSocket.webSocketState.subscribe { state: WebSocketConnectionState ->
val webSocketDisposable = authWebSocket.state.subscribe { state: WebSocketConnectionState ->
Log.d(TAG, "WebSocket State: $state")
// Any change to a non-connected state means that we are not drained
@@ -397,13 +396,13 @@ class IncomingMessageObserver(private val context: Application, private val sign
}
}
signalWebSocket.connect()
authWebSocket.connect()
try {
while (!terminated && isConnectionNecessary()) {
try {
Log.d(TAG, "Reading message...")
val hasMore = signalWebSocket.readMessageBatch(websocketReadTimeout, 30) { batch ->
val hasMore = authWebSocket.readMessageBatch(websocketReadTimeout, 30) { batch ->
Log.i(TAG, "Retrieved ${batch.size} envelopes!")
val bufferedStore = BufferedProtocolStore.create()
@@ -425,7 +424,7 @@ class IncomingMessageObserver(private val context: Application, private val sign
AppDependencies.jobManager.addAllChains(jobs)
}
signalWebSocket.sendAck(response)
authWebSocket.sendAck(response)
}
}
}
@@ -448,7 +447,7 @@ class IncomingMessageObserver(private val context: Application, private val sign
}
} catch (e: WebSocketUnavailableException) {
Log.i(TAG, "Pipe unexpectedly unavailable, connecting")
signalWebSocket.connect()
authWebSocket.connect()
} catch (e: TimeoutException) {
Log.w(TAG, "Application level read timeout...")
attempts = 0

View File

@@ -55,6 +55,6 @@ public final class DeviceTransferBlockingInterceptor implements Interceptor {
public void unblockNetwork() {
blockNetworking = false;
AppDependencies.getIncomingMessageObserver();
AppDependencies.startNetwork();
}
}

View File

@@ -1,207 +0,0 @@
package org.thoughtcrime.securesms.net;
import android.app.Application;
import androidx.annotation.NonNull;
import org.signal.core.util.logging.Log;
import org.thoughtcrime.securesms.util.TextSecurePreferences;
import org.whispersystems.signalservice.api.SignalWebSocket;
import org.whispersystems.signalservice.api.util.Preconditions;
import org.whispersystems.signalservice.api.util.SleepTimer;
import org.whispersystems.signalservice.api.websocket.HealthMonitor;
import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;
import org.whispersystems.signalservice.internal.websocket.OkHttpWebSocketConnection;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import io.reactivex.rxjava3.schedulers.Schedulers;
/**
* Monitors the health of the identified and unidentified WebSockets. If either one appears to be
* unhealthy, will trigger restarting both.
* <p>
* The monitor is also responsible for sending heartbeats/keep-alive messages to prevent
* timeouts.
*/
public final class SignalWebSocketHealthMonitor implements HealthMonitor {
private static final String TAG = Log.tag(SignalWebSocketHealthMonitor.class);
/**
* This is the amount of time in between sent keep alives. Must be greater than {@link SignalWebSocketHealthMonitor#KEEP_ALIVE_TIMEOUT}
*/
private static final long KEEP_ALIVE_SEND_CADENCE = TimeUnit.SECONDS.toMillis(OkHttpWebSocketConnection.KEEPALIVE_FREQUENCY_SECONDS);
/**
* This is the amount of time we will wait for a response to the keep alive before we consider the websockets dead.
* It is required that this value be less than {@link SignalWebSocketHealthMonitor#KEEP_ALIVE_SEND_CADENCE}
*/
private static final long KEEP_ALIVE_TIMEOUT = TimeUnit.SECONDS.toMillis(20);
private final Executor executor = Executors.newSingleThreadExecutor();
private final Application context;
private SignalWebSocket signalWebSocket;
private final SleepTimer sleepTimer;
private KeepAliveSender keepAliveSender;
private final HealthState identified = new HealthState();
private final HealthState unidentified = new HealthState();
public SignalWebSocketHealthMonitor(@NonNull Application context, @NonNull SleepTimer sleepTimer) {
this.context = context;
this.sleepTimer = sleepTimer;
}
public void monitor(@NonNull SignalWebSocket signalWebSocket) {
executor.execute(() -> {
Preconditions.checkNotNull(signalWebSocket);
Preconditions.checkArgument(this.signalWebSocket == null, "monitor can only be called once");
this.signalWebSocket = signalWebSocket;
//noinspection ResultOfMethodCallIgnored
signalWebSocket.getWebSocketState()
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation())
.distinctUntilChanged()
.subscribe(s -> onStateChange(s, identified, true));
//noinspection ResultOfMethodCallIgnored
signalWebSocket.getUnidentifiedWebSocketState()
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation())
.distinctUntilChanged()
.subscribe(s -> onStateChange(s, unidentified, false));
});
}
private void onStateChange(WebSocketConnectionState connectionState, HealthState healthState, boolean isIdentified) {
executor.execute(() -> {
switch (connectionState) {
case CONNECTED:
if (isIdentified) {
TextSecurePreferences.setUnauthorizedReceived(context, false);
break;
}
case AUTHENTICATION_FAILED:
if (isIdentified) {
TextSecurePreferences.setUnauthorizedReceived(context, true);
break;
}
case FAILED:
break;
}
healthState.needsKeepAlive = connectionState == WebSocketConnectionState.CONNECTED;
if (keepAliveSender == null && isKeepAliveNecessary()) {
keepAliveSender = new KeepAliveSender();
keepAliveSender.start();
} else if (keepAliveSender != null && !isKeepAliveNecessary()) {
keepAliveSender.shutdown();
keepAliveSender = null;
}
});
}
@Override
public void onKeepAliveResponse(long sentTimestamp, boolean isIdentifiedWebSocket) {
final long keepAliveTime = System.currentTimeMillis();
executor.execute(() -> {
if (isIdentifiedWebSocket) {
identified.lastKeepAliveReceived = keepAliveTime;
} else {
unidentified.lastKeepAliveReceived = keepAliveTime;
}
});
}
@Override
public void onMessageError(int status, boolean isIdentifiedWebSocket) {
executor.execute(() -> {
if (status == 409) {
HealthState healthState = (isIdentifiedWebSocket ? identified : unidentified);
if (healthState.mismatchErrorTracker.addSample(System.currentTimeMillis())) {
Log.w(TAG, "Received too many mismatch device errors, forcing new websockets.");
signalWebSocket.forceNewWebSockets();
}
}
});
}
private boolean isKeepAliveNecessary() {
return identified.needsKeepAlive || unidentified.needsKeepAlive;
}
private static class HealthState {
private final HttpErrorTracker mismatchErrorTracker = new HttpErrorTracker(5, TimeUnit.MINUTES.toMillis(1));
private volatile boolean needsKeepAlive;
private volatile long lastKeepAliveReceived;
}
/**
* Sends periodic heartbeats/keep-alives over both WebSockets to prevent connection timeouts. If
* either WebSocket fails to get a return heartbeat after {@link SignalWebSocketHealthMonitor#KEEP_ALIVE_TIMEOUT} seconds, both are forced to be recreated.
*/
private class KeepAliveSender extends Thread {
private volatile boolean shouldKeepRunning = true;
public void run() {
Log.d(TAG, "[KeepAliveSender] started");
identified.lastKeepAliveReceived = System.currentTimeMillis();
unidentified.lastKeepAliveReceived = System.currentTimeMillis();
long keepAliveSendTime = System.currentTimeMillis();
while (shouldKeepRunning && isKeepAliveNecessary()) {
try {
long nextKeepAliveSendTime = (keepAliveSendTime + KEEP_ALIVE_SEND_CADENCE);
sleepUntil(nextKeepAliveSendTime);
if (shouldKeepRunning && isKeepAliveNecessary()) {
keepAliveSendTime = System.currentTimeMillis();
signalWebSocket.sendKeepAlive();
}
final long responseRequiredTime = keepAliveSendTime + KEEP_ALIVE_TIMEOUT;
sleepUntil(responseRequiredTime);
if (shouldKeepRunning && isKeepAliveNecessary()) {
if (identified.lastKeepAliveReceived < keepAliveSendTime || unidentified.lastKeepAliveReceived < keepAliveSendTime) {
Log.w(TAG, "Missed keep alives, identified last: " + identified.lastKeepAliveReceived +
" unidentified last: " + unidentified.lastKeepAliveReceived +
" needed by: " + responseRequiredTime);
signalWebSocket.forceNewWebSockets();
}
}
} catch (Throwable e) {
Log.w(TAG, e);
}
}
Log.d(TAG, "[KeepAliveSender] ended");
}
private void sleepUntil(long timeMs) {
while (System.currentTimeMillis() < timeMs) {
long waitTime = timeMs - System.currentTimeMillis();
if (waitTime > 0) {
try {
sleepTimer.sleep(waitTime);
} catch (InterruptedException e) {
Log.w(TAG, e);
}
}
}
}
public void shutdown() {
shouldKeepRunning = false;
}
}
}

View File

@@ -0,0 +1,172 @@
/*
* Copyright 2025 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.net
import io.reactivex.rxjava3.kotlin.subscribeBy
import io.reactivex.rxjava3.schedulers.Schedulers
import org.signal.core.util.logging.Log
import org.thoughtcrime.securesms.dependencies.AppDependencies
import org.thoughtcrime.securesms.util.TextSecurePreferences
import org.whispersystems.signalservice.api.util.SleepTimer
import org.whispersystems.signalservice.api.websocket.HealthMonitor
import org.whispersystems.signalservice.api.websocket.SignalWebSocket
import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState
import org.whispersystems.signalservice.internal.websocket.OkHttpWebSocketConnection
import java.util.concurrent.Executor
import java.util.concurrent.Executors
import kotlin.concurrent.Volatile
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds
class SignalWebSocketHealthMonitor(
private val sleepTimer: SleepTimer
) : HealthMonitor {
companion object {
private val TAG = Log.tag(SignalWebSocketHealthMonitor::class)
/**
* This is the amount of time in between sent keep alives. Must be greater than [KEEP_ALIVE_TIMEOUT]
*/
private val KEEP_ALIVE_SEND_CADENCE: Duration = OkHttpWebSocketConnection.KEEPALIVE_FREQUENCY_SECONDS.seconds
/**
* This is the amount of time we will wait for a response to the keep alive before we consider the websockets dead.
* It is required that this value be less than [KEEP_ALIVE_SEND_CADENCE]
*/
private val KEEP_ALIVE_TIMEOUT: Duration = 20.seconds
}
private val executor: Executor = Executors.newSingleThreadExecutor()
private var webSocket: SignalWebSocket? = null
private var keepAliveSender: KeepAliveSender? = null
private var needsKeepAlive = false
private var lastKeepAliveReceived: Duration = 0.seconds
@Suppress("CheckResult")
fun monitor(webSocket: SignalWebSocket) {
executor.execute {
check(this.webSocket == null)
this.webSocket = webSocket
webSocket
.state
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation())
.distinctUntilChanged()
.subscribeBy { onStateChanged(it) }
webSocket.keepAliveChangedListener = this::updateKeepAliveSenderStatus
}
}
private fun onStateChanged(connectionState: WebSocketConnectionState) {
executor.execute {
when (connectionState) {
WebSocketConnectionState.CONNECTED -> {
if (webSocket is SignalWebSocket.AuthenticatedWebSocket) {
TextSecurePreferences.setUnauthorizedReceived(AppDependencies.application, false)
}
}
WebSocketConnectionState.AUTHENTICATION_FAILED -> {
if (webSocket is SignalWebSocket.AuthenticatedWebSocket) {
TextSecurePreferences.setUnauthorizedReceived(AppDependencies.application, true)
}
}
else -> Unit
}
needsKeepAlive = connectionState == WebSocketConnectionState.CONNECTED
updateKeepAliveSenderStatus()
}
}
override fun onKeepAliveResponse(sentTimestamp: Long, isIdentifiedWebSocket: Boolean) {
val keepAliveTime = System.currentTimeMillis().milliseconds
executor.execute {
lastKeepAliveReceived = keepAliveTime
}
}
override fun onMessageError(status: Int, isIdentifiedWebSocket: Boolean) = Unit
private fun updateKeepAliveSenderStatus() {
if (keepAliveSender == null && sendKeepAlives()) {
keepAliveSender = KeepAliveSender()
keepAliveSender!!.start()
} else if (keepAliveSender != null && !sendKeepAlives()) {
keepAliveSender!!.shutdown()
keepAliveSender = null
}
}
private fun sendKeepAlives(): Boolean {
return needsKeepAlive && webSocket?.shouldSendKeepAlives == true
}
/**
* Sends periodic heartbeats/keep-alives over the WebSocket to prevent connection timeouts. If
* the WebSocket fails to get a return heartbeat after [KEEP_ALIVE_TIMEOUT] seconds, it is forced to be recreated.
*/
private inner class KeepAliveSender : Thread() {
@Volatile
private var shouldKeepRunning = true
override fun run() {
Log.d(TAG, "[KeepAliveSender($id)] started")
lastKeepAliveReceived = System.currentTimeMillis().milliseconds
var keepAliveSendTime = System.currentTimeMillis().milliseconds
while (shouldKeepRunning && sendKeepAlives()) {
try {
val nextKeepAliveSendTime: Duration = keepAliveSendTime + KEEP_ALIVE_SEND_CADENCE
sleepUntil(nextKeepAliveSendTime)
if (shouldKeepRunning && sendKeepAlives()) {
keepAliveSendTime = System.currentTimeMillis().milliseconds
webSocket?.sendKeepAlive()
}
val responseRequiredTime: Duration = keepAliveSendTime + KEEP_ALIVE_TIMEOUT
sleepUntil(responseRequiredTime)
if (shouldKeepRunning && sendKeepAlives()) {
if (lastKeepAliveReceived < keepAliveSendTime) {
Log.w(TAG, "Missed keep alive, last: ${lastKeepAliveReceived.inWholeMilliseconds} needed by: ${responseRequiredTime.inWholeMilliseconds}")
webSocket?.forceNewWebSocket()
}
}
} catch (e: Throwable) {
Log.w(TAG, e)
}
}
Log.d(TAG, "[KeepAliveSender($id)] ended")
}
fun sleepUntil(time: Duration) {
while (System.currentTimeMillis().milliseconds < time) {
val waitTime = time - System.currentTimeMillis().milliseconds
if (waitTime.isPositive()) {
try {
sleepTimer.sleep(waitTime.inWholeMilliseconds)
} catch (e: InterruptedException) {
Log.w(TAG, e)
}
}
}
}
fun shutdown() {
shouldKeepRunning = false
}
}
}

View File

@@ -222,7 +222,7 @@ object RegistrationRepository {
SvrRepository.onRegistrationComplete(masterKey, data.pin, hasPin, data.reglockEnabled)
AppDependencies.resetNetwork()
AppDependencies.incomingMessageObserver
AppDependencies.startNetwork()
PreKeysSyncJob.enqueue()
val jobManager = AppDependencies.jobManager

View File

@@ -217,7 +217,7 @@ object RegistrationRepository {
SvrRepository.onRegistrationComplete(masterKey, data.pin, hasPin, data.reglockEnabled)
AppDependencies.resetNetwork()
AppDependencies.incomingMessageObserver
AppDependencies.startNetwork()
PreKeysSyncJob.enqueue()
val jobManager = AppDependencies.jobManager

View File

@@ -37,12 +37,12 @@ public final class SignalProxyUtil {
private SignalProxyUtil() {}
public static void startListeningToWebsocket() {
if (SignalStore.proxy().isProxyEnabled() && AppDependencies.getSignalWebSocket().getWebSocketState().firstOrError().blockingGet().isFailure()) {
if (SignalStore.proxy().isProxyEnabled() && AppDependencies.getAuthWebSocket().getState().firstOrError().blockingGet().isFailure()) {
Log.w(TAG, "Proxy is in a failed state. Restarting.");
AppDependencies.resetNetwork();
}
AppDependencies.getIncomingMessageObserver();
AppDependencies.startNetwork();
}
/**
@@ -88,8 +88,8 @@ public final class SignalProxyUtil {
return testWebsocketConnectionUnregistered(timeout);
}
return AppDependencies.getSignalWebSocket()
.getWebSocketState()
return AppDependencies.getAuthWebSocket()
.getState()
.subscribeOn(Schedulers.trampoline())
.observeOn(Schedulers.trampoline())
.timeout(timeout, TimeUnit.MILLISECONDS)