Remove job-based decryption support and MCPv1.

This commit is contained in:
Cody Henthorne
2023-08-16 14:28:14 -04:00
committed by GitHub
parent 3d94122abc
commit fbf4de0ec5
43 changed files with 476 additions and 5165 deletions

View File

@@ -48,14 +48,13 @@ import org.thoughtcrime.securesms.jobs.PaymentLedgerUpdateJob
import org.thoughtcrime.securesms.jobs.PaymentTransactionCheckJob
import org.thoughtcrime.securesms.jobs.ProfileKeySendJob
import org.thoughtcrime.securesms.jobs.PushProcessEarlyMessagesJob
import org.thoughtcrime.securesms.jobs.PushProcessMessageJob
import org.thoughtcrime.securesms.jobs.PushProcessMessageJobV2
import org.thoughtcrime.securesms.jobs.RefreshAttributesJob
import org.thoughtcrime.securesms.jobs.RetrieveProfileJob
import org.thoughtcrime.securesms.jobs.SendDeliveryReceiptJob
import org.thoughtcrime.securesms.jobs.TrimThreadJob
import org.thoughtcrime.securesms.linkpreview.LinkPreview
import org.thoughtcrime.securesms.linkpreview.LinkPreviewUtil
import org.thoughtcrime.securesms.messages.MessageContentProcessor.StorageFailedException
import org.thoughtcrime.securesms.messages.MessageContentProcessorV2.Companion.debug
import org.thoughtcrime.securesms.messages.MessageContentProcessorV2.Companion.log
import org.thoughtcrime.securesms.messages.MessageContentProcessorV2.Companion.warn
@@ -613,7 +612,7 @@ object DataMessageProcessor {
val paymentNotification = message.payment.notification
val uuid = UUID.randomUUID()
val queue = "Payment_" + PushProcessMessageJob.getQueueName(senderRecipientId)
val queue = "Payment_" + PushProcessMessageJobV2.getQueueName(senderRecipientId)
try {
SignalDatabase.payments.createIncomingPayment(

View File

@@ -34,6 +34,7 @@ import org.thoughtcrime.securesms.util.hasSharedContact
import org.whispersystems.signalservice.api.crypto.EnvelopeMetadata
import org.whispersystems.signalservice.internal.push.SignalServiceProtos
import org.whispersystems.signalservice.internal.push.SignalServiceProtos.DataMessage
import org.whispersystems.signalservice.internal.push.SignalServiceProtos.Envelope
import java.util.Optional
object EditMessageProcessor {
@@ -41,7 +42,7 @@ object EditMessageProcessor {
context: Context,
senderRecipient: Recipient,
threadRecipient: Recipient,
envelope: SignalServiceProtos.Envelope,
envelope: Envelope,
content: SignalServiceProtos.Content,
metadata: EnvelopeMetadata,
earlyMessageCacheEntry: EarlyMessageCacheEntry?
@@ -116,7 +117,7 @@ object EditMessageProcessor {
private fun handleEditMediaMessage(
senderRecipientId: RecipientId,
groupId: GroupId.V2?,
envelope: SignalServiceProtos.Envelope,
envelope: Envelope,
metadata: EnvelopeMetadata,
message: DataMessage,
targetMessage: MediaMmsMessageRecord
@@ -176,7 +177,7 @@ object EditMessageProcessor {
private fun handleEditTextMessage(
senderRecipientId: RecipientId,
groupId: GroupId.V2?,
envelope: SignalServiceProtos.Envelope,
envelope: Envelope,
metadata: EnvelopeMetadata,
message: DataMessage,
targetMessage: MediaMmsMessageRecord

View File

@@ -0,0 +1,12 @@
/*
* Copyright 2023 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.messages
import org.thoughtcrime.securesms.groups.GroupId
/**
* Message processing exception metadata.
*/
class ExceptionMetadata @JvmOverloads constructor(val sender: String, val senderDevice: Int, val groupId: GroupId? = null)

View File

@@ -1,6 +1,5 @@
package org.thoughtcrime.securesms.messages
import android.annotation.SuppressLint
import android.app.Application
import android.app.Service
import android.content.Context
@@ -17,15 +16,11 @@ import org.thoughtcrime.securesms.crypto.ReentrantSessionLock
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies
import org.thoughtcrime.securesms.groups.GroupsV2ProcessingLock
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobmanager.JobTracker
import org.thoughtcrime.securesms.jobmanager.JobTracker.JobListener
import org.thoughtcrime.securesms.jobmanager.impl.BackoffUtil
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint
import org.thoughtcrime.securesms.jobs.ForegroundServiceUtil
import org.thoughtcrime.securesms.jobs.ForegroundServiceUtil.startWhenCapable
import org.thoughtcrime.securesms.jobs.PushDecryptMessageJob
import org.thoughtcrime.securesms.jobs.PushProcessMessageJob
import org.thoughtcrime.securesms.jobs.PushProcessMessageErrorV2Job
import org.thoughtcrime.securesms.jobs.PushProcessMessageJobV2
import org.thoughtcrime.securesms.jobs.UnableToStartException
import org.thoughtcrime.securesms.keyvalue.SignalStore
@@ -161,19 +156,6 @@ class IncomingMessageObserver(private val context: Application) {
decryptionDrainedListeners.remove(listener)
}
fun notifyDecryptionsDrained() {
if (ApplicationDependencies.getJobManager().isQueueEmpty(PushDecryptMessageJob.QUEUE)) {
Log.i(TAG, "Queue was empty when notified. Signaling change.")
connectionNecessarySemaphore.release()
} else {
Log.i(TAG, "Queue still had items when notified. Registering listener to signal change.")
ApplicationDependencies.getJobManager().addListener(
{ it.parameters.queue == PushDecryptMessageJob.QUEUE },
DecryptionDrainedQueueListener()
)
}
}
private fun onAppForegrounded() {
lock.withLock {
appVisible = true
@@ -213,17 +195,15 @@ class IncomingMessageObserver(private val context: Application) {
val hasNetwork = NetworkConstraint.isMet(context)
val hasProxy = SignalStore.proxy().isProxyEnabled
val forceWebsocket = SignalStore.internalValues().isWebsocketModeForced
val decryptQueueEmpty = ApplicationDependencies.getJobManager().isQueueEmpty(PushDecryptMessageJob.QUEUE)
val lastInteractionString = if (appVisibleSnapshot) "N/A" else timeIdle.toString() + " ms (" + (if (timeIdle < maxBackgroundTime) "within limit" else "over limit") + ")"
val conclusion = registered &&
(appVisibleSnapshot || timeIdle < maxBackgroundTime || !fcmEnabled || keepAliveEntries.isNotEmpty()) &&
hasNetwork &&
decryptQueueEmpty
hasNetwork
val needsConnectionString = if (conclusion) "Needs Connection" else "Does Not Need Connection"
Log.d(TAG, "[$needsConnectionString] Network: $hasNetwork, Foreground: $appVisibleSnapshot, Time Since Last Interaction: $lastInteractionString, FCM: $fcmEnabled, Stay open requests: $keepAliveEntries, Registered: $registered, Proxy: $hasProxy, Force websocket: $forceWebsocket, Decrypt Queue Empty: $decryptQueueEmpty")
Log.d(TAG, "[$needsConnectionString] Network: $hasNetwork, Foreground: $appVisibleSnapshot, Time Since Last Interaction: $lastInteractionString, FCM: $fcmEnabled, Stay open requests: $keepAliveEntries, Registered: $registered, Proxy: $hasProxy, Force websocket: $forceWebsocket")
return conclusion
}
@@ -309,11 +289,9 @@ class IncomingMessageObserver(private val context: Application) {
}
is MessageDecryptor.Result.Error -> {
return result.followUpOperations + FollowUpOperation {
PushProcessMessageJob(
PushProcessMessageErrorV2Job(
result.toMessageState(),
null,
result.errorMetadata.toExceptionMetadata(),
-1,
result.envelope.timestamp
)
}
@@ -342,19 +320,19 @@ class IncomingMessageObserver(private val context: Application) {
SignalDatabase.messageLog.deleteEntryForRecipient(envelope.timestamp, senderId, envelope.sourceDevice)
}
private fun MessageDecryptor.Result.toMessageState(): MessageContentProcessor.MessageState {
private fun MessageDecryptor.Result.toMessageState(): MessageState {
return when (this) {
is MessageDecryptor.Result.DecryptionError -> MessageContentProcessor.MessageState.DECRYPTION_ERROR
is MessageDecryptor.Result.Ignore -> MessageContentProcessor.MessageState.NOOP
is MessageDecryptor.Result.InvalidVersion -> MessageContentProcessor.MessageState.INVALID_VERSION
is MessageDecryptor.Result.LegacyMessage -> MessageContentProcessor.MessageState.LEGACY_MESSAGE
is MessageDecryptor.Result.Success -> MessageContentProcessor.MessageState.DECRYPTED_OK
is MessageDecryptor.Result.UnsupportedDataMessage -> MessageContentProcessor.MessageState.UNSUPPORTED_DATA_MESSAGE
is MessageDecryptor.Result.DecryptionError -> MessageState.DECRYPTION_ERROR
is MessageDecryptor.Result.Ignore -> MessageState.NOOP
is MessageDecryptor.Result.InvalidVersion -> MessageState.INVALID_VERSION
is MessageDecryptor.Result.LegacyMessage -> MessageState.LEGACY_MESSAGE
is MessageDecryptor.Result.Success -> MessageState.DECRYPTED_OK
is MessageDecryptor.Result.UnsupportedDataMessage -> MessageState.UNSUPPORTED_DATA_MESSAGE
}
}
private fun MessageDecryptor.ErrorMetadata.toExceptionMetadata(): MessageContentProcessor.ExceptionMetadata {
return MessageContentProcessor.ExceptionMetadata(
private fun MessageDecryptor.ErrorMetadata.toExceptionMetadata(): ExceptionMetadata {
return ExceptionMetadata(
this.sender,
this.senderDevice,
this.groupId
@@ -463,21 +441,6 @@ class IncomingMessageObserver(private val context: Application) {
}
}
private inner class DecryptionDrainedQueueListener : JobListener {
@SuppressLint("WrongThread")
override fun onStateChanged(job: Job, jobState: JobTracker.JobState) {
if (jobState.isComplete) {
if (ApplicationDependencies.getJobManager().isQueueEmpty(PushDecryptMessageJob.QUEUE)) {
Log.i(TAG, "Queue is now empty. Signaling change.")
connectionNecessarySemaphore.release()
ApplicationDependencies.getJobManager().removeListener(this)
} else {
Log.i(TAG, "Item finished in queue, but it's still not empty. Waiting to signal change.")
}
}
}
}
class ForegroundService : Service() {
override fun onBind(intent: Intent?): IBinder? {
return null

View File

@@ -3,6 +3,7 @@ package org.thoughtcrime.securesms.messages
import android.content.Context
import org.signal.core.util.logging.Log
import org.signal.core.util.orNull
import org.signal.core.util.toOptional
import org.signal.libsignal.protocol.SignalProtocolAddress
import org.signal.libsignal.protocol.ecc.ECPublicKey
import org.signal.libsignal.protocol.message.DecryptionErrorMessage
@@ -21,6 +22,7 @@ import org.thoughtcrime.securesms.groups.GroupNotAMemberException
import org.thoughtcrime.securesms.groups.GroupsV1MigratedCache
import org.thoughtcrime.securesms.groups.GroupsV1MigrationUtil
import org.thoughtcrime.securesms.groups.v2.processing.GroupsV2StateProcessor
import org.thoughtcrime.securesms.jobs.AutomaticSessionResetJob
import org.thoughtcrime.securesms.jobs.NullMessageSendJob
import org.thoughtcrime.securesms.jobs.ResendMessageJob
import org.thoughtcrime.securesms.jobs.SenderKeyDistributionSendJob
@@ -38,6 +40,8 @@ import org.thoughtcrime.securesms.messages.SignalServiceProtoUtil.toDecryptionEr
import org.thoughtcrime.securesms.notifications.v2.ConversationId
import org.thoughtcrime.securesms.recipients.Recipient
import org.thoughtcrime.securesms.recipients.RecipientId
import org.thoughtcrime.securesms.sms.IncomingEncryptedMessage
import org.thoughtcrime.securesms.sms.IncomingTextMessage
import org.thoughtcrime.securesms.util.EarlyMessageCacheEntry
import org.thoughtcrime.securesms.util.FeatureFlags
import org.thoughtcrime.securesms.util.SignalLocalMetrics
@@ -284,6 +288,29 @@ open class MessageContentProcessorV2(private val context: Context) {
null
}
}
private fun insertErrorMessage(context: Context, sender: Recipient, senderDevice: Int, timestamp: Long, groupId: Optional<GroupId>, marker: (Long) -> Unit) {
val textMessage = IncomingTextMessage(
sender.id,
senderDevice,
timestamp,
-1,
System.currentTimeMillis(),
"",
groupId,
0,
false,
null
)
SignalDatabase
.messages
.insertMessageInbox(IncomingEncryptedMessage(textMessage, ""))
.ifPresent {
marker(it.messageId)
ApplicationDependencies.getMessageNotifier().updateNotification(context, ConversationId.forConversation(it.threadId))
}
}
}
/**
@@ -304,7 +331,7 @@ open class MessageContentProcessorV2(private val context: Context) {
val earlyCacheEntries: List<EarlyMessageCacheEntry>? = ApplicationDependencies
.getEarlyMessageCache()
.retrieveV2(senderRecipient.id, envelope.timestamp)
.retrieve(senderRecipient.id, envelope.timestamp)
.orNull()
if (!processingEarlyContent && earlyCacheEntries != null) {
@@ -315,6 +342,63 @@ open class MessageContentProcessorV2(private val context: Context) {
}
}
fun processException(messageState: MessageState, exceptionMetadata: ExceptionMetadata, timestamp: Long) {
val sender = Recipient.external(context, exceptionMetadata.sender)
if (sender.isBlocked) {
warn("Ignoring exception content from blocked sender, message state: $messageState")
return
}
when (messageState) {
MessageState.DECRYPTION_ERROR -> {
warn(timestamp, "Handling encryption error.")
val threadRecipient = if (exceptionMetadata.groupId != null) Recipient.externalPossiblyMigratedGroup(exceptionMetadata.groupId) else sender
SignalDatabase
.messages
.insertBadDecryptMessage(
recipientId = sender.id,
senderDevice = exceptionMetadata.senderDevice,
sentTimestamp = timestamp,
receivedTimestamp = System.currentTimeMillis(),
threadId = SignalDatabase.threads.getOrCreateThreadIdFor(threadRecipient)
)
}
MessageState.INVALID_VERSION -> {
warn(timestamp, "Handling invalid version.")
insertErrorMessage(context, sender, exceptionMetadata.senderDevice, timestamp, exceptionMetadata.groupId.toOptional()) { messageId ->
SignalDatabase.messages.markAsInvalidVersionKeyExchange(messageId)
}
}
MessageState.LEGACY_MESSAGE -> {
warn(timestamp, "Handling legacy message.")
insertErrorMessage(context, sender, exceptionMetadata.senderDevice, timestamp, exceptionMetadata.groupId.toOptional()) { messageId ->
SignalDatabase.messages.markAsLegacyVersion(messageId)
}
}
MessageState.UNSUPPORTED_DATA_MESSAGE -> {
warn(timestamp, "Handling unsupported data message.")
insertErrorMessage(context, sender, exceptionMetadata.senderDevice, timestamp, exceptionMetadata.groupId.toOptional()) { messageId ->
SignalDatabase.messages.markAsUnsupportedProtocolVersion(messageId)
}
}
MessageState.CORRUPT_MESSAGE,
MessageState.NO_SESSION -> {
warn(timestamp, "Discovered old enqueued bad encrypted message. Scheduling reset.")
ApplicationDependencies.getJobManager().add(AutomaticSessionResetJob(sender.id, exceptionMetadata.senderDevice, timestamp))
}
MessageState.DUPLICATE_MESSAGE -> warn(timestamp, "Duplicate message. Dropping.")
else -> throw AssertionError("Not handled $messageState. ($timestamp)")
}
}
private fun handleMessage(
senderRecipient: Recipient,
envelope: Envelope,
@@ -350,6 +434,7 @@ open class MessageContentProcessorV2(private val context: Context) {
localMetric
)
}
content.hasSyncMessage() -> {
TextSecurePreferences.setMultiDevice(context, true)
@@ -362,6 +447,7 @@ open class MessageContentProcessorV2(private val context: Context) {
if (processingEarlyContent) null else EarlyMessageCacheEntry(envelope, content, metadata, serverDeliveredTimestamp)
)
}
content.hasCallMessage() -> {
log(envelope.timestamp, "Got call message...")
@@ -375,6 +461,7 @@ open class MessageContentProcessorV2(private val context: Context) {
CallMessageProcessor.process(senderRecipient, envelope, content, metadata, serverDeliveredTimestamp)
}
content.hasReceiptMessage() -> {
ReceiptMessageProcessor.process(
context,
@@ -385,9 +472,11 @@ open class MessageContentProcessorV2(private val context: Context) {
if (processingEarlyContent) null else EarlyMessageCacheEntry(envelope, content, metadata, serverDeliveredTimestamp)
)
}
content.hasTypingMessage() -> {
handleTypingMessage(envelope, metadata, content.typingMessage, senderRecipient)
}
content.hasStoryMessage() -> {
StoryMessageProcessor.process(
envelope,
@@ -397,9 +486,11 @@ open class MessageContentProcessorV2(private val context: Context) {
threadRecipient
)
}
content.hasDecryptionErrorMessage() -> {
handleRetryReceipt(envelope, metadata, content.decryptionErrorMessage!!.toDecryptionErrorMessage(metadata), senderRecipient)
}
content.hasEditMessage() -> {
EditMessageProcessor.process(
context,
@@ -411,9 +502,11 @@ open class MessageContentProcessorV2(private val context: Context) {
if (processingEarlyContent) null else EarlyMessageCacheEntry(envelope, content, metadata, serverDeliveredTimestamp)
)
}
content.hasSenderKeyDistributionMessage() || content.hasPniSignatureMessage() -> {
// Already handled, here in order to prevent unrecognized message log
}
else -> {
warn(envelope.timestamp, "Got unrecognized message!")
}

View File

@@ -0,0 +1,21 @@
/*
* Copyright 2023 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.messages;
/**
* Message processing state/result
*/
public enum MessageState {
DECRYPTED_OK,
INVALID_VERSION,
CORRUPT_MESSAGE, // Not used, but can't remove due to serialization
NO_SESSION, // Not used, but can't remove due to serialization
LEGACY_MESSAGE,
DUPLICATE_MESSAGE,
UNSUPPORTED_DATA_MESSAGE,
NOOP,
DECRYPTION_ERROR
}

View File

@@ -0,0 +1,25 @@
/*
* Copyright 2023 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.messages;
class StorageFailedException extends Exception {
private final String sender;
private final int senderDevice;
StorageFailedException(Exception e, String sender, int senderDevice) {
super(e);
this.sender = sender;
this.senderDevice = senderDevice;
}
public String getSender() {
return sender;
}
public int getSenderDevice() {
return senderDevice;
}
}

View File

@@ -21,10 +21,11 @@ import org.thoughtcrime.securesms.util.Base64
import org.thoughtcrime.securesms.util.FeatureFlags
import org.whispersystems.signalservice.api.crypto.EnvelopeMetadata
import org.whispersystems.signalservice.internal.push.SignalServiceProtos
import org.whispersystems.signalservice.internal.push.SignalServiceProtos.Envelope
object StoryMessageProcessor {
fun process(envelope: SignalServiceProtos.Envelope, content: SignalServiceProtos.Content, metadata: EnvelopeMetadata, senderRecipient: Recipient, threadRecipient: Recipient) {
fun process(envelope: Envelope, content: SignalServiceProtos.Content, metadata: EnvelopeMetadata, senderRecipient: Recipient, threadRecipient: Recipient) {
val storyMessage = content.storyMessage
log(envelope.timestamp, "Story message.")
@@ -79,7 +80,7 @@ object StoryMessageProcessor {
SignalDatabase.messages.setTransactionSuccessful()
}
} catch (e: MmsException) {
throw MessageContentProcessor.StorageFailedException(e, metadata.sourceServiceId.toString(), metadata.sourceDeviceId)
throw StorageFailedException(e, metadata.sourceServiceId.toString(), metadata.sourceDeviceId)
} finally {
SignalDatabase.messages.endTransaction()
}

View File

@@ -52,7 +52,6 @@ import org.thoughtcrime.securesms.jobs.RefreshOwnProfileJob
import org.thoughtcrime.securesms.jobs.StickerPackDownloadJob
import org.thoughtcrime.securesms.keyvalue.SignalStore
import org.thoughtcrime.securesms.linkpreview.LinkPreview
import org.thoughtcrime.securesms.messages.MessageContentProcessor.StorageFailedException
import org.thoughtcrime.securesms.messages.MessageContentProcessorV2.Companion.log
import org.thoughtcrime.securesms.messages.MessageContentProcessorV2.Companion.warn
import org.thoughtcrime.securesms.messages.SignalServiceProtoUtil.groupId

View File

@@ -10,7 +10,7 @@ import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobmanager.JobTracker
import org.thoughtcrime.securesms.jobmanager.JobTracker.JobListener
import org.thoughtcrime.securesms.jobs.MarkerJob
import org.thoughtcrime.securesms.jobs.PushProcessMessageJob
import org.thoughtcrime.securesms.jobs.PushProcessMessageJobV2
import org.thoughtcrime.securesms.util.NetworkUtil
import org.thoughtcrime.securesms.util.PowerManagerCompat
import org.thoughtcrime.securesms.util.ServiceUtil
@@ -81,7 +81,7 @@ object WebSocketDrainer {
val queueListener = QueueFindingJobListener()
jobManager.addListener(
{ job: Job -> job.parameters.queue?.startsWith(PushProcessMessageJob.QUEUE_PREFIX) ?: false },
{ job: Job -> job.parameters.queue?.startsWith(PushProcessMessageJobV2.QUEUE_PREFIX) ?: false },
queueListener
)