Do not process messages while pending restore decision.

This commit is contained in:
Cody Henthorne
2025-04-23 16:44:15 -04:00
parent 91140c41fd
commit a53479e50d
2 changed files with 59 additions and 46 deletions

View File

@@ -86,7 +86,10 @@ class RegistrationValues internal constructor(store: KeyValueStore) : SignalStor
store.beginWrite()
.putBlob(RESTORE_DECISION_STATE, newValue.encode())
.apply()
AppDependencies.incomingMessageObserver.notifyRegistrationStateChanged()
if (newValue.isTerminal) {
AppDependencies.incomingMessageObserver.notifyRestoreDecisionMade()
}
}
}
}

View File

@@ -174,6 +174,11 @@ class IncomingMessageObserver(private val context: Application, private val auth
connectionNecessarySemaphore.release()
}
fun notifyRestoreDecisionMade() {
Log.i(TAG, "Restore decision made, can restart network and process messages")
AppDependencies.resetNetwork()
}
fun addDecryptionDrainedListener(listener: Runnable) {
decryptionDrainedListeners.add(listener)
if (decryptionDrained) {
@@ -216,17 +221,15 @@ class IncomingMessageObserver(private val context: Application, private val auth
val hasProxy = SignalStore.proxy.isProxyEnabled
val forceWebsocket = SignalStore.internal.isWebsocketModeForced
val websocketAlreadyOpen = isConnectionAvailable()
val canProcessIncomingMessages = canProcessIncomingMessages()
val lastInteractionString = if (appVisibleSnapshot) "N/A" else timeIdle.toString() + " ms (" + (if (timeIdle < maxBackgroundTime) "within limit" else "over limit") + ")"
val conclusion = registered &&
(appVisibleSnapshot || timeIdle < maxBackgroundTime || !fcmEnabled) &&
hasNetwork &&
canProcessIncomingMessages
hasNetwork
val needsConnectionString = if (conclusion) "Needs Connection" else "Does Not Need Connection"
Log.d(TAG, "[$needsConnectionString] Network: $hasNetwork, Foreground: $appVisibleSnapshot, Time Since Last Interaction: $lastInteractionString, FCM: $fcmEnabled, WS Connected: $websocketAlreadyOpen, Registered: $registered, Proxy: $hasProxy, Force websocket: $forceWebsocket, Can process messages: $canProcessIncomingMessages")
Log.d(TAG, "[$needsConnectionString] Network: $hasNetwork, Foreground: $appVisibleSnapshot, Time Since Last Interaction: $lastInteractionString, FCM: $fcmEnabled, WS Connected: $websocketAlreadyOpen, Registered: $registered, Proxy: $hasProxy, Force websocket: $forceWebsocket")
return conclusion
}
@@ -234,14 +237,10 @@ class IncomingMessageObserver(private val context: Application, private val auth
return authWebSocket.stateSnapshot == WebSocketConnectionState.CONNECTED
}
private fun canProcessIncomingMessages(): Boolean {
return !(RemoteConfig.restoreAfterRegistration && SignalStore.registration.restoreDecisionState.isDecisionPending)
}
private fun waitForConnectionNecessary() {
try {
connectionNecessarySemaphore.drainPermits()
while (!isConnectionNecessary() && !(isConnectionAvailable() && canProcessIncomingMessages())) {
while (!isConnectionNecessary() && !isConnectionAvailable()) {
val numberDrained = connectionNecessarySemaphore.drainPermits()
if (numberDrained == 0) {
connectionNecessarySemaphore.acquire()
@@ -354,12 +353,15 @@ class IncomingMessageObserver(private val context: Application, private val auth
private inner class MessageRetrievalThread : Thread("MessageRetrievalService"), Thread.UncaughtExceptionHandler {
private var sleepTimer: SleepTimer
private val canProcessMessages: Boolean
init {
Log.i(TAG, "Initializing! (${this.hashCode()})")
uncaughtExceptionHandler = this
sleepTimer = if (!SignalStore.account.fcmEnabled || SignalStore.internal.isWebsocketModeForced) AlarmSleepTimer(context) else UptimeSleepTimer()
canProcessMessages = !(RemoteConfig.restoreAfterRegistration && SignalStore.registration.restoreDecisionState.isDecisionPending)
}
override fun run() {
@@ -392,7 +394,7 @@ class IncomingMessageObserver(private val context: Application, private val auth
try {
authWebSocket.connect()
var isConnectionNecessary = false
while (!terminated && canProcessIncomingMessages() && (isConnectionNecessary().also { isConnectionNecessary = it } || isConnectionAvailable())) {
while (!terminated && (isConnectionNecessary().also { isConnectionNecessary = it } || isConnectionAvailable())) {
if (isConnectionNecessary) {
authWebSocket.registerKeepAliveToken(WEB_SOCKET_KEEP_ALIVE_TOKEN)
} else {
@@ -400,50 +402,58 @@ class IncomingMessageObserver(private val context: Application, private val auth
}
try {
Log.d(TAG, "Reading message...")
if (canProcessMessages) {
Log.d(TAG, "Reading message...")
val hasMore = authWebSocket.readMessageBatch(websocketReadTimeout, 30) { batch ->
Log.i(TAG, "Retrieved ${batch.size} envelopes!")
val bufferedStore = BufferedProtocolStore.create()
val hasMore = authWebSocket.readMessageBatch(websocketReadTimeout, 30) { batch ->
Log.i(TAG, "Retrieved ${batch.size} envelopes!")
val bufferedStore = BufferedProtocolStore.create()
val startTime = System.currentTimeMillis()
GroupsV2ProcessingLock.acquireGroupProcessingLock().use {
ReentrantSessionLock.INSTANCE.acquire().use {
batch.forEach { response ->
Log.d(TAG, "Beginning database transaction...")
val followUpOperations = SignalDatabase.runInTransaction { db ->
val followUps: List<FollowUpOperation>? = processEnvelope(bufferedStore, response.envelope, response.serverDeliveredTimestamp)
bufferedStore.flushToDisk()
followUps
val startTime = System.currentTimeMillis()
GroupsV2ProcessingLock.acquireGroupProcessingLock().use {
ReentrantSessionLock.INSTANCE.acquire().use {
batch.forEach { response ->
Log.d(TAG, "Beginning database transaction...")
val followUpOperations = SignalDatabase.runInTransaction { db ->
val followUps: List<FollowUpOperation>? = processEnvelope(bufferedStore, response.envelope, response.serverDeliveredTimestamp)
bufferedStore.flushToDisk()
followUps
}
Log.d(TAG, "Ended database transaction.")
if (followUpOperations != null) {
Log.d(TAG, "Running ${followUpOperations.size} follow-up operations...")
val jobs = followUpOperations.mapNotNull { it.run() }
AppDependencies.jobManager.addAllChains(jobs)
}
authWebSocket.sendAck(response)
}
Log.d(TAG, "Ended database transaction.")
if (followUpOperations != null) {
Log.d(TAG, "Running ${followUpOperations.size} follow-up operations...")
val jobs = followUpOperations.mapNotNull { it.run() }
AppDependencies.jobManager.addAllChains(jobs)
}
authWebSocket.sendAck(response)
}
}
val duration = System.currentTimeMillis() - startTime
val timePerMessage: Float = duration / batch.size.toFloat()
Log.d(TAG, "Decrypted ${batch.size} envelopes in $duration ms (~${round(timePerMessage * 100) / 100} ms per message)")
}
val duration = System.currentTimeMillis() - startTime
val timePerMessage: Float = duration / batch.size.toFloat()
Log.d(TAG, "Decrypted ${batch.size} envelopes in $duration ms (~${round(timePerMessage * 100) / 100} ms per message)")
}
attempts = 0
SignalLocalMetrics.PushWebsocketFetch.onProcessedBatch()
attempts = 0
SignalLocalMetrics.PushWebsocketFetch.onProcessedBatch()
if (!hasMore && !decryptionDrained) {
Log.i(TAG, "Decryptions newly-drained.")
decryptionDrained = true
if (!hasMore && !decryptionDrained) {
Log.i(TAG, "Decryptions newly-drained.")
decryptionDrained = true
for (listener in decryptionDrainedListeners.toList()) {
listener.run()
for (listener in decryptionDrainedListeners.toList()) {
listener.run()
}
} else if (!hasMore) {
Log.w(TAG, "Got tombstone, but we thought the network was already drained!")
}
} else if (!hasMore) {
Log.w(TAG, "Got tombstone, but we thought the network was already drained!")
} else {
Log.d(TAG, "Reading and dropping message...")
authWebSocket.readMessageBatch(websocketReadTimeout, 30) { batch ->
Log.w(TAG, "Retrieved ${batch.size} envelopes but dropping until we can finish backup restore.")
}
attempts = 0
}
} catch (e: WebSocketUnavailableException) {
Log.i(TAG, "Pipe unexpectedly unavailable, connecting")