diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/MessageTable.kt b/app/src/main/java/org/thoughtcrime/securesms/database/MessageTable.kt index bbd41ca9af..929d557966 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/MessageTable.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/MessageTable.kt @@ -1032,15 +1032,6 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat type = type or MessageTypes.KEY_EXCHANGE_IDENTITY_DEFAULT_BIT } - val recipient = Recipient.resolved(message.authorId) - - val groupRecipient: Recipient? = if (message.groupId == null) { - null - } else { - val id = recipients.getOrInsertFromPossiblyMigratedGroupId(message.groupId!!) - Recipient.resolved(id) - } - val silent = message.isIdentityUpdate || message.isIdentityVerified || message.isIdentityDefault || @@ -1053,7 +1044,7 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat Util.isDefaultSmsProvider(context) ) - val threadId: Long = if (groupRecipient == null) threads.getOrCreateThreadIdFor(recipient) else threads.getOrCreateThreadIdFor(groupRecipient) + val threadId: Long = if (message.groupId == null) threads.getOrCreateThreadIdFor(message.authorId, false) else threads.getOrCreateThreadIdFor(RecipientId.from(message.groupId!!), true) if (tryToCollapseJoinRequestEvents) { val result = collapseJoinRequestEventsIfPossible(threadId, message as IncomingGroupUpdateMessage) @@ -1098,7 +1089,7 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat } if (message.subscriptionId != -1) { - recipients.setDefaultSubscriptionId(recipient.id, message.subscriptionId) + recipients.setDefaultSubscriptionId(message.authorId, message.subscriptionId) } writableDatabase.setTransactionSuccessful() } finally { diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/ThreadTable.kt b/app/src/main/java/org/thoughtcrime/securesms/database/ThreadTable.kt index f7a7a735d1..1998a6b59c 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/ThreadTable.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/ThreadTable.kt @@ -1152,6 +1152,11 @@ class ThreadTable(context: Context, databaseHelper: SignalDatabase) : DatabaseTa return threadId ?: createThreadForRecipient(recipient.id, recipient.isGroup, distributionType) } + fun getOrCreateThreadIdFor(recipientId: RecipientId, isGroup: Boolean, distributionType: Int = DistributionTypes.DEFAULT): Long { + val threadId = getThreadIdFor(recipientId) + return threadId ?: createThreadForRecipient(recipientId, isGroup, distributionType) + } + fun areThreadIdAndRecipientAssociated(threadId: Long, recipient: Recipient): Boolean { return readableDatabase .exists(TABLE_NAME) diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/PushProcessMessageJobV2.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/PushProcessMessageJobV2.kt index 6aa7df239c..7902e7e565 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/PushProcessMessageJobV2.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/PushProcessMessageJobV2.kt @@ -1,18 +1,18 @@ package org.thoughtcrime.securesms.jobs -import androidx.annotation.WorkerThread import okio.ByteString import okio.ByteString.Companion.toByteString import org.signal.core.util.logging.Log import org.thoughtcrime.securesms.database.SignalDatabase.Companion.groups +import org.thoughtcrime.securesms.dependencies.ApplicationDependencies import org.thoughtcrime.securesms.groups.GroupChangeBusyException import org.thoughtcrime.securesms.groups.GroupsV1MigratedCache import org.thoughtcrime.securesms.jobmanager.Job import org.thoughtcrime.securesms.jobmanager.impl.ChangeNumberConstraint import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint import org.thoughtcrime.securesms.messages.MessageContentProcessorV2 +import org.thoughtcrime.securesms.messages.MessageDecryptor import org.thoughtcrime.securesms.messages.SignalServiceProtoUtil.groupId -import org.thoughtcrime.securesms.recipients.Recipient import org.thoughtcrime.securesms.recipients.RecipientId import org.thoughtcrime.securesms.util.GroupUtil import org.whispersystems.signalservice.api.crypto.EnvelopeMetadata @@ -34,14 +34,6 @@ class PushProcessMessageJobV2 private constructor( private val serverDeliveredTimestamp: Long ) : BaseJob(parameters) { - @WorkerThread - constructor( - envelope: Envelope, - content: Content, - metadata: EnvelopeMetadata, - serverDeliveredTimestamp: Long - ) : this(createParameters(content, metadata), envelope.toBuilder().clearContent().build(), content, metadata, serverDeliveredTimestamp) - override fun shouldTrace() = true override fun serialize(): ByteArray { @@ -107,40 +99,66 @@ class PushProcessMessageJobV2 private constructor( private val TAG = Log.tag(PushProcessMessageJobV2::class.java) + /** + * Cache to keep track of empty 1:1 processing queues. Once a 1:1 queue is empty + * we no longer enqueue jobs on it and instead process inline. This is not + * true for groups, as with groups we may have to do network fetches + * to get group state up to date. + */ + private val empty1to1QueueCache = HashSet() + private fun getQueueName(recipientId: RecipientId): String { return QUEUE_PREFIX + recipientId.toQueueKey() } - @WorkerThread - private fun createParameters(content: Content, metadata: EnvelopeMetadata): Parameters { + fun processOrDefer(messageProcessor: MessageContentProcessorV2, result: MessageDecryptor.Result.Success): PushProcessMessageJobV2? { val queueName: String - val builder = Parameters.Builder() - .setMaxAttempts(Parameters.UNLIMITED) - .addConstraint(ChangeNumberConstraint.KEY) - val groupContext = GroupUtil.getGroupContextIfPresent(content) + val groupContext = GroupUtil.getGroupContextIfPresent(result.content) val groupId = groupContext?.groupId + var requireNetwork = false - if (groupContext != null && groupId != null) { - queueName = getQueueName(Recipient.externalPossiblyMigratedGroup(groupId).id) + if (groupId != null) { + queueName = getQueueName(RecipientId.from(groupId)) if (groupId.isV2) { val localRevision = groups.getGroupV2Revision(groupId.requireV2()) if (groupContext.revision > localRevision || GroupsV1MigratedCache.hasV1Group(groupId)) { Log.i(TAG, "Adding network constraint to group-related job.") - builder.addConstraint(NetworkConstraint.KEY).setLifespan(TimeUnit.DAYS.toMillis(30)) + requireNetwork = true } } - } else if (content.hasSyncMessage() && content.syncMessage.hasSent() && content.syncMessage.sent.hasDestinationUuid()) { - queueName = getQueueName(RecipientId.from(ServiceId.parseOrThrow(content.syncMessage.sent.destinationUuid))) + } else if (result.content.hasSyncMessage() && result.content.syncMessage.hasSent() && result.content.syncMessage.sent.hasDestinationUuid()) { + queueName = getQueueName(RecipientId.from(ServiceId.parseOrThrow(result.content.syncMessage.sent.destinationUuid))) } else { - queueName = getQueueName(RecipientId.from(metadata.sourceServiceId)) + queueName = getQueueName(RecipientId.from(result.metadata.sourceServiceId)) } - builder.setQueue(queueName) + return if (requireNetwork || !isQueueEmpty(queueName = queueName, isGroup = groupId != null)) { + val builder = Parameters.Builder() + .setMaxAttempts(Parameters.UNLIMITED) + .addConstraint(ChangeNumberConstraint.KEY) + .setQueue(queueName) + if (requireNetwork) { + builder.addConstraint(NetworkConstraint.KEY).setLifespan(TimeUnit.DAYS.toMillis(30)) + } + PushProcessMessageJobV2(builder.build(), result.envelope.toBuilder().clearContent().build(), result.content, result.metadata, result.serverDeliveredTimestamp) + } else { + messageProcessor.process(result.envelope, result.content, result.metadata, result.serverDeliveredTimestamp) + null + } + } - return builder.build() + private fun isQueueEmpty(queueName: String, isGroup: Boolean): Boolean { + if (!isGroup && empty1to1QueueCache.contains(queueName)) { + return true + } + val queueEmpty = ApplicationDependencies.getJobManager().isQueueEmpty(queueName) + if (!isGroup && queueEmpty) { + empty1to1QueueCache.add(queueName) + } + return queueEmpty } } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt index 741b998635..b33a243c99 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt @@ -33,6 +33,7 @@ import org.thoughtcrime.securesms.jobs.PushProcessMessageJobV2 import org.thoughtcrime.securesms.jobs.UnableToStartException import org.thoughtcrime.securesms.keyvalue.SignalStore import org.thoughtcrime.securesms.messages.MessageDecryptor.FollowUpOperation +import org.thoughtcrime.securesms.messages.SignalServiceProtoUtil.groupId import org.thoughtcrime.securesms.messages.protocol.BufferedProtocolStore import org.thoughtcrime.securesms.notifications.NotificationChannels import org.thoughtcrime.securesms.recipients.RecipientId @@ -91,6 +92,8 @@ class IncomingMessageObserver(private val context: Application) { private val lock: ReentrantLock = ReentrantLock() private val connectionNecessarySemaphore = Semaphore(0) + private val messageContentProcessor = MessageContentProcessorV2(context) + private var appVisible = false private var lastInteractionTime: Long = System.currentTimeMillis() @@ -300,33 +303,33 @@ class IncomingMessageObserver(private val context: Application) { private fun processMessage(bufferedProtocolStore: BufferedProtocolStore, envelope: SignalServiceProtos.Envelope, serverDeliveredTimestamp: Long): List { val result = MessageDecryptor.decrypt(context, bufferedProtocolStore, envelope, serverDeliveredTimestamp) - - val extraJob: Job? = when (result) { + when (result) { is MessageDecryptor.Result.Success -> { - PushProcessMessageJobV2(result.envelope, result.content, result.metadata, result.serverDeliveredTimestamp) + val job = PushProcessMessageJobV2.processOrDefer(messageContentProcessor, result) + if (job != null) { + return result.followUpOperations + FollowUpOperation { job } + } } - is MessageDecryptor.Result.Error -> { - PushProcessMessageJob( - result.toMessageState(), - null, - result.errorMetadata.toExceptionMetadata(), - -1, - result.envelope.timestamp - ) + return result.followUpOperations + FollowUpOperation { + PushProcessMessageJob( + result.toMessageState(), + null, + result.errorMetadata.toExceptionMetadata(), + -1, + result.envelope.timestamp + ) + } } - is MessageDecryptor.Result.Ignore -> { // No action needed - null } - else -> { throw AssertionError("Unexpected result! ${result.javaClass.simpleName}") } } - return result.followUpOperations + FollowUpOperation { extraJob } + return result.followUpOperations } private fun processReceipt(envelope: SignalServiceProtos.Envelope) {