Attempt to prevent message retry loops.

This commit is contained in:
Greyson Parrelli
2024-02-23 15:36:05 -05:00
parent dc32e51ac2
commit c4842ae7c5
14 changed files with 280 additions and 73 deletions

View File

@@ -30,6 +30,7 @@ import org.thoughtcrime.securesms.notifications.NotificationChannels
import org.thoughtcrime.securesms.recipients.RecipientId
import org.thoughtcrime.securesms.util.AppForegroundObserver
import org.thoughtcrime.securesms.util.SignalLocalMetrics
import org.thoughtcrime.securesms.util.asChain
import org.whispersystems.signalservice.api.push.ServiceId
import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState
import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException
@@ -294,7 +295,7 @@ class IncomingMessageObserver(private val context: Application) {
is MessageDecryptor.Result.Success -> {
val job = PushProcessMessageJob.processOrDefer(messageContentProcessor, result, localReceiveMetric)
if (job != null) {
return result.followUpOperations + FollowUpOperation { job }
return result.followUpOperations + FollowUpOperation { job.asChain() }
}
}
is MessageDecryptor.Result.Error -> {
@@ -303,7 +304,7 @@ class IncomingMessageObserver(private val context: Application) {
result.toMessageState(),
result.errorMetadata.toExceptionMetadata(),
result.envelope.timestamp!!
)
).asChain()
}
}
is MessageDecryptor.Result.Ignore -> {
@@ -404,7 +405,7 @@ class IncomingMessageObserver(private val context: Application) {
if (followUpOperations != null) {
Log.d(TAG, "Running ${followUpOperations.size} follow-up operations...")
val jobs = followUpOperations.mapNotNull { it.run() }
ApplicationDependencies.getJobManager().addAll(jobs)
ApplicationDependencies.getJobManager().addAllChains(jobs)
}
signalWebSocket.sendAck(response)

View File

@@ -37,7 +37,7 @@ import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies
import org.thoughtcrime.securesms.groups.BadGroupIdException
import org.thoughtcrime.securesms.groups.GroupId
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobmanager.JobManager
import org.thoughtcrime.securesms.jobs.AutomaticSessionResetJob
import org.thoughtcrime.securesms.jobs.PreKeysSyncJob
import org.thoughtcrime.securesms.jobs.SendRetryReceiptJob
@@ -50,6 +50,8 @@ import org.thoughtcrime.securesms.notifications.NotificationIds
import org.thoughtcrime.securesms.recipients.Recipient
import org.thoughtcrime.securesms.recipients.RecipientId
import org.thoughtcrime.securesms.util.FeatureFlags
import org.thoughtcrime.securesms.util.LRUCache
import org.thoughtcrime.securesms.util.asChain
import org.whispersystems.signalservice.api.InvalidMessageStructureException
import org.whispersystems.signalservice.api.crypto.ContentHint
import org.whispersystems.signalservice.api.crypto.EnvelopeMetadata
@@ -77,6 +79,8 @@ object MessageDecryptor {
private val TAG = Log.tag(MessageDecryptor::class.java)
private val decryptionErrorCounts: MutableMap<RecipientId, DecryptionErrorCount> = LRUCache(100)
/**
* Decrypts an envelope and provides a [Result]. This method has side effects, but all of them are limited to [SignalDatabase].
* That means that this operation should be atomic when performed within a transaction.
@@ -125,8 +129,9 @@ object MessageDecryptor {
val followUpOperations: MutableList<FollowUpOperation> = mutableListOf()
if (envelope.type == Envelope.Type.PREKEY_BUNDLE) {
Log.i(TAG, "${logPrefix(envelope)} Prekey message. Scheduling a prekey sync job.")
followUpOperations += FollowUpOperation {
PreKeysSyncJob.create()
PreKeysSyncJob.create().asChain()
}
}
@@ -219,7 +224,7 @@ object MessageDecryptor {
followUpOperations += FollowUpOperation {
val sender: Recipient = Recipient.external(context, e.sender)
AutomaticSessionResetJob(sender.id, e.senderDevice, envelope.timestamp!!)
AutomaticSessionResetJob(sender.id, e.senderDevice, envelope.timestamp!!).asChain()
}
Result.Ignore(envelope, serverDeliveredTimestamp, followUpOperations.toUnmodifiableList())
@@ -277,36 +282,70 @@ object MessageDecryptor {
val senderDevice: Int = protocolException.senderDevice
val receivedTimestamp: Long = System.currentTimeMillis()
val sender: Recipient = Recipient.external(context, protocolException.sender)
val senderServiceId: ServiceId? = ServiceId.parseOrNull(protocolException.sender)
if (sender.isSelf) {
Log.w(TAG, "${logPrefix(envelope)} Decryption error for a sync message! Enqueuing a session reset job.")
Log.w(TAG, "${logPrefix(envelope)} Decryption error for a sync message! Enqueuing a session reset job.", true)
followUpOperations += FollowUpOperation {
AutomaticSessionResetJob(sender.id, senderDevice, envelope.timestamp!!)
AutomaticSessionResetJob(sender.id, senderDevice, envelope.timestamp!!).asChain()
}
return Result.Ignore(envelope, serverDeliveredTimestamp, followUpOperations)
}
val errorCount: DecryptionErrorCount = decryptionErrorCounts.getOrPut(sender.id) { DecryptionErrorCount(count = 0, lastReceivedTime = 0) }
val timeSinceLastError = receivedTimestamp - errorCount.lastReceivedTime
if (timeSinceLastError > FeatureFlags.retryReceiptMaxCountResetAge() && errorCount.count > 0) {
Log.i(TAG, "${logPrefix(envelope, senderServiceId)} Resetting decryption error count for ${sender.id} because it has been $timeSinceLastError ms since the last error.", true)
errorCount.count = 0
}
errorCount.count++
errorCount.lastReceivedTime = receivedTimestamp
if (errorCount.count > FeatureFlags.retryReceiptMaxCount()) {
Log.w(TAG, "${logPrefix(envelope, senderServiceId)} This is error number ${errorCount.count} from ${sender.id}, which is greater than the maximum of ${FeatureFlags.retryReceiptMaxCount()}. Ignoring.", true)
if (contentHint == ContentHint.IMPLICIT) {
Log.w(TAG, "${logPrefix(envelope, senderServiceId)} The content hint is $contentHint, so no error message is needed.", true)
Result.Ignore(envelope, serverDeliveredTimestamp, followUpOperations)
} else {
Log.w(TAG, "${logPrefix(envelope, senderServiceId)} The content hint is $contentHint, so we need to insert an error right away.", true)
return Result.DecryptionError(envelope, serverDeliveredTimestamp, protocolException.toErrorMetadata(), followUpOperations.toUnmodifiableList())
}
} else {
Log.w(TAG, "${logPrefix(envelope, senderServiceId)} This is error number ${errorCount.count} from ${sender.id}.${if (errorCount.count > 1) " It has been $timeSinceLastError ms since the last error." else "" }", true)
}
followUpOperations += FollowUpOperation {
buildSendRetryReceiptJob(envelope, protocolException, sender)
val retryJob = buildSendRetryReceiptJob(envelope, protocolException, sender)
// Note: if the message is sealed sender, it's envelope type will be UNIDENTIFIED_SENDER. The only way we can currently check if the error is
// prekey-related in that situation is using a string match.
if (envelope.type == Envelope.Type.PREKEY_BUNDLE || protocolException.message?.lowercase()?.contains("prekey") == true) {
Log.w(TAG, "${logPrefix(envelope, senderServiceId)} Got a decryption error on a prekey message. Forcing a prekey rotation before requesting the retry.", true)
PreKeysSyncJob.create(forceRotationRequested = true).asChain().then(retryJob)
} else {
retryJob.asChain()
}
}
return when (contentHint) {
ContentHint.DEFAULT -> {
Log.w(TAG, "${logPrefix(envelope)} The content hint is $contentHint, so we need to insert an error right away.", true)
Log.w(TAG, "${logPrefix(envelope, senderServiceId)} The content hint is $contentHint, so we need to insert an error right away.", true)
Result.DecryptionError(envelope, serverDeliveredTimestamp, protocolException.toErrorMetadata(), followUpOperations.toUnmodifiableList())
}
ContentHint.RESENDABLE -> {
Log.w(TAG, "${logPrefix(envelope)} The content hint is $contentHint, so we can try to resend the message.", true)
Log.w(TAG, "${logPrefix(envelope, senderServiceId)} The content hint is $contentHint, so we can try to resend the message.", true)
followUpOperations += FollowUpOperation {
val groupId: GroupId? = protocolException.parseGroupId(envelope)
val threadId: Long? = if (groupId != null) {
if (SignalDatabase.groups.getGroup(groupId).isAbsent()) {
Log.w(TAG, "${logPrefix(envelope)} No group found for $groupId! Not inserting a retry receipt.")
Log.w(TAG, "${logPrefix(envelope, senderServiceId)} No group found for $groupId! Not inserting a retry receipt.")
return@FollowUpOperation null
}
@@ -317,7 +356,7 @@ object MessageDecryptor {
}
if (threadId == null) {
Log.w(TAG, "${logPrefix(envelope)} Thread does not already exist for sender ${sender.id}! We will not create one just to show a retry receipt.")
Log.w(TAG, "${logPrefix(envelope, senderServiceId)} Thread does not already exist for sender ${sender.id}! We will not create one just to show a retry receipt.")
return@FollowUpOperation null
}
@@ -330,7 +369,7 @@ object MessageDecryptor {
}
ContentHint.IMPLICIT -> {
Log.w(TAG, "${logPrefix(envelope)} The content hint is $contentHint, so no error message is needed.", true)
Log.w(TAG, "${logPrefix(envelope, senderServiceId)} The content hint is $contentHint, so no error message is needed.", true)
Result.Ignore(envelope, serverDeliveredTimestamp, followUpOperations)
}
}
@@ -399,20 +438,24 @@ object MessageDecryptor {
}
private fun logPrefix(envelope: Envelope): String {
return logPrefix(envelope.timestamp!!, envelope.sourceServiceId ?: "<sealed>", envelope.sourceDevice)
return logPrefix(envelope.timestamp!!, ServiceId.parseOrNull(envelope.sourceServiceId)?.logString() ?: "<sealed>", envelope.sourceDevice)
}
private fun logPrefix(envelope: Envelope, sender: ServiceId): String {
return logPrefix(envelope.timestamp!!, sender.toString(), envelope.sourceDevice)
private fun logPrefix(envelope: Envelope, sender: ServiceId?): String {
return logPrefix(envelope.timestamp!!, sender?.logString() ?: "?", envelope.sourceDevice)
}
private fun logPrefix(envelope: Envelope, sender: String): String {
return logPrefix(envelope.timestamp!!, ServiceId.parseOrNull(sender)?.logString() ?: "?", envelope.sourceDevice)
}
private fun logPrefix(envelope: Envelope, cipherResult: SignalServiceCipherResult): String {
return logPrefix(envelope.timestamp!!, cipherResult.metadata.sourceServiceId.toString(), cipherResult.metadata.sourceDeviceId)
return logPrefix(envelope.timestamp!!, cipherResult.metadata.sourceServiceId.logString(), cipherResult.metadata.sourceDeviceId)
}
private fun logPrefix(envelope: Envelope, exception: ProtocolException): String {
return if (exception.sender != null) {
logPrefix(envelope.timestamp!!, exception.sender, exception.senderDevice)
logPrefix(envelope.timestamp!!, ServiceId.parseOrNull(exception.sender)?.logString() ?: "?", exception.senderDevice)
} else {
logPrefix(envelope.timestamp!!, envelope.sourceServiceId, envelope.sourceDevice)
}
@@ -546,7 +589,12 @@ object MessageDecryptor {
val groupId: GroupId?
)
data class DecryptionErrorCount(
var count: Int,
var lastReceivedTime: Long
)
fun interface FollowUpOperation {
fun run(): Job?
fun run(): JobManager.Chain?
}
}