From a53479e50de45d43829fc6563c33cab6e0bc0c93 Mon Sep 17 00:00:00 2001 From: Cody Henthorne Date: Wed, 23 Apr 2025 16:44:15 -0400 Subject: [PATCH] Do not process messages while pending restore decision. --- .../securesms/keyvalue/RegistrationValues.kt | 5 +- .../messages/IncomingMessageObserver.kt | 100 ++++++++++-------- 2 files changed, 59 insertions(+), 46 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/keyvalue/RegistrationValues.kt b/app/src/main/java/org/thoughtcrime/securesms/keyvalue/RegistrationValues.kt index ac69d5ffa5..832fcb0f06 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/keyvalue/RegistrationValues.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/keyvalue/RegistrationValues.kt @@ -86,7 +86,10 @@ class RegistrationValues internal constructor(store: KeyValueStore) : SignalStor store.beginWrite() .putBlob(RESTORE_DECISION_STATE, newValue.encode()) .apply() - AppDependencies.incomingMessageObserver.notifyRegistrationStateChanged() + + if (newValue.isTerminal) { + AppDependencies.incomingMessageObserver.notifyRestoreDecisionMade() + } } } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt index c2c206b830..9d93bb7eb7 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt @@ -174,6 +174,11 @@ class IncomingMessageObserver(private val context: Application, private val auth connectionNecessarySemaphore.release() } + fun notifyRestoreDecisionMade() { + Log.i(TAG, "Restore decision made, can restart network and process messages") + AppDependencies.resetNetwork() + } + fun addDecryptionDrainedListener(listener: Runnable) { decryptionDrainedListeners.add(listener) if (decryptionDrained) { @@ -216,17 +221,15 @@ class IncomingMessageObserver(private val context: Application, private val auth val hasProxy = SignalStore.proxy.isProxyEnabled val forceWebsocket = SignalStore.internal.isWebsocketModeForced val websocketAlreadyOpen = isConnectionAvailable() - val canProcessIncomingMessages = canProcessIncomingMessages() val lastInteractionString = if (appVisibleSnapshot) "N/A" else timeIdle.toString() + " ms (" + (if (timeIdle < maxBackgroundTime) "within limit" else "over limit") + ")" val conclusion = registered && (appVisibleSnapshot || timeIdle < maxBackgroundTime || !fcmEnabled) && - hasNetwork && - canProcessIncomingMessages + hasNetwork val needsConnectionString = if (conclusion) "Needs Connection" else "Does Not Need Connection" - Log.d(TAG, "[$needsConnectionString] Network: $hasNetwork, Foreground: $appVisibleSnapshot, Time Since Last Interaction: $lastInteractionString, FCM: $fcmEnabled, WS Connected: $websocketAlreadyOpen, Registered: $registered, Proxy: $hasProxy, Force websocket: $forceWebsocket, Can process messages: $canProcessIncomingMessages") + Log.d(TAG, "[$needsConnectionString] Network: $hasNetwork, Foreground: $appVisibleSnapshot, Time Since Last Interaction: $lastInteractionString, FCM: $fcmEnabled, WS Connected: $websocketAlreadyOpen, Registered: $registered, Proxy: $hasProxy, Force websocket: $forceWebsocket") return conclusion } @@ -234,14 +237,10 @@ class IncomingMessageObserver(private val context: Application, private val auth return authWebSocket.stateSnapshot == WebSocketConnectionState.CONNECTED } - private fun canProcessIncomingMessages(): Boolean { - return !(RemoteConfig.restoreAfterRegistration && SignalStore.registration.restoreDecisionState.isDecisionPending) - } - private fun waitForConnectionNecessary() { try { connectionNecessarySemaphore.drainPermits() - while (!isConnectionNecessary() && !(isConnectionAvailable() && canProcessIncomingMessages())) { + while (!isConnectionNecessary() && !isConnectionAvailable()) { val numberDrained = connectionNecessarySemaphore.drainPermits() if (numberDrained == 0) { connectionNecessarySemaphore.acquire() @@ -354,12 +353,15 @@ class IncomingMessageObserver(private val context: Application, private val auth private inner class MessageRetrievalThread : Thread("MessageRetrievalService"), Thread.UncaughtExceptionHandler { private var sleepTimer: SleepTimer + private val canProcessMessages: Boolean init { Log.i(TAG, "Initializing! (${this.hashCode()})") uncaughtExceptionHandler = this sleepTimer = if (!SignalStore.account.fcmEnabled || SignalStore.internal.isWebsocketModeForced) AlarmSleepTimer(context) else UptimeSleepTimer() + + canProcessMessages = !(RemoteConfig.restoreAfterRegistration && SignalStore.registration.restoreDecisionState.isDecisionPending) } override fun run() { @@ -392,7 +394,7 @@ class IncomingMessageObserver(private val context: Application, private val auth try { authWebSocket.connect() var isConnectionNecessary = false - while (!terminated && canProcessIncomingMessages() && (isConnectionNecessary().also { isConnectionNecessary = it } || isConnectionAvailable())) { + while (!terminated && (isConnectionNecessary().also { isConnectionNecessary = it } || isConnectionAvailable())) { if (isConnectionNecessary) { authWebSocket.registerKeepAliveToken(WEB_SOCKET_KEEP_ALIVE_TOKEN) } else { @@ -400,50 +402,58 @@ class IncomingMessageObserver(private val context: Application, private val auth } try { - Log.d(TAG, "Reading message...") + if (canProcessMessages) { + Log.d(TAG, "Reading message...") - val hasMore = authWebSocket.readMessageBatch(websocketReadTimeout, 30) { batch -> - Log.i(TAG, "Retrieved ${batch.size} envelopes!") - val bufferedStore = BufferedProtocolStore.create() + val hasMore = authWebSocket.readMessageBatch(websocketReadTimeout, 30) { batch -> + Log.i(TAG, "Retrieved ${batch.size} envelopes!") + val bufferedStore = BufferedProtocolStore.create() - val startTime = System.currentTimeMillis() - GroupsV2ProcessingLock.acquireGroupProcessingLock().use { - ReentrantSessionLock.INSTANCE.acquire().use { - batch.forEach { response -> - Log.d(TAG, "Beginning database transaction...") - val followUpOperations = SignalDatabase.runInTransaction { db -> - val followUps: List? = processEnvelope(bufferedStore, response.envelope, response.serverDeliveredTimestamp) - bufferedStore.flushToDisk() - followUps + val startTime = System.currentTimeMillis() + GroupsV2ProcessingLock.acquireGroupProcessingLock().use { + ReentrantSessionLock.INSTANCE.acquire().use { + batch.forEach { response -> + Log.d(TAG, "Beginning database transaction...") + val followUpOperations = SignalDatabase.runInTransaction { db -> + val followUps: List? = processEnvelope(bufferedStore, response.envelope, response.serverDeliveredTimestamp) + bufferedStore.flushToDisk() + followUps + } + Log.d(TAG, "Ended database transaction.") + + if (followUpOperations != null) { + Log.d(TAG, "Running ${followUpOperations.size} follow-up operations...") + val jobs = followUpOperations.mapNotNull { it.run() } + AppDependencies.jobManager.addAllChains(jobs) + } + + authWebSocket.sendAck(response) } - Log.d(TAG, "Ended database transaction.") - - if (followUpOperations != null) { - Log.d(TAG, "Running ${followUpOperations.size} follow-up operations...") - val jobs = followUpOperations.mapNotNull { it.run() } - AppDependencies.jobManager.addAllChains(jobs) - } - - authWebSocket.sendAck(response) } } + val duration = System.currentTimeMillis() - startTime + val timePerMessage: Float = duration / batch.size.toFloat() + Log.d(TAG, "Decrypted ${batch.size} envelopes in $duration ms (~${round(timePerMessage * 100) / 100} ms per message)") } - val duration = System.currentTimeMillis() - startTime - val timePerMessage: Float = duration / batch.size.toFloat() - Log.d(TAG, "Decrypted ${batch.size} envelopes in $duration ms (~${round(timePerMessage * 100) / 100} ms per message)") - } - attempts = 0 - SignalLocalMetrics.PushWebsocketFetch.onProcessedBatch() + attempts = 0 + SignalLocalMetrics.PushWebsocketFetch.onProcessedBatch() - if (!hasMore && !decryptionDrained) { - Log.i(TAG, "Decryptions newly-drained.") - decryptionDrained = true + if (!hasMore && !decryptionDrained) { + Log.i(TAG, "Decryptions newly-drained.") + decryptionDrained = true - for (listener in decryptionDrainedListeners.toList()) { - listener.run() + for (listener in decryptionDrainedListeners.toList()) { + listener.run() + } + } else if (!hasMore) { + Log.w(TAG, "Got tombstone, but we thought the network was already drained!") } - } else if (!hasMore) { - Log.w(TAG, "Got tombstone, but we thought the network was already drained!") + } else { + Log.d(TAG, "Reading and dropping message...") + authWebSocket.readMessageBatch(websocketReadTimeout, 30) { batch -> + Log.w(TAG, "Retrieved ${batch.size} envelopes but dropping until we can finish backup restore.") + } + attempts = 0 } } catch (e: WebSocketUnavailableException) { Log.i(TAG, "Pipe unexpectedly unavailable, connecting")