Improve incoming group receipt processing.

This commit is contained in:
Cody Henthorne
2026-03-06 10:37:01 -05:00
committed by jeffrey-signal
parent da2eb02cde
commit bb730c137f
3 changed files with 60 additions and 43 deletions

View File

@@ -5002,8 +5002,8 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat
.run()
}
fun incrementDeliveryReceiptCounts(targetTimestamps: List<Long>, receiptAuthor: RecipientId, receiptSentTimestamp: Long, stopwatch: Stopwatch? = null): Set<Long> {
return incrementReceiptCounts(targetTimestamps, receiptAuthor, receiptSentTimestamp, ReceiptType.DELIVERY, stopwatch = stopwatch)
fun incrementDeliveryReceiptCounts(targetTimestamps: List<Long>, receiptAuthor: RecipientId, receiptSentTimestamp: Long, stopwatch: Stopwatch? = null, receiptDataCache: MutableMap<Long, ReceiptData>? = null): Set<Long> {
return incrementReceiptCounts(targetTimestamps, receiptAuthor, receiptSentTimestamp, ReceiptType.DELIVERY, stopwatch = stopwatch, receiptDataCache = receiptDataCache)
}
fun incrementDeliveryReceiptCount(targetTimestamps: Long, receiptAuthor: RecipientId, receiptSentTimestamp: Long): Boolean {
@@ -5013,8 +5013,8 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat
/**
* @return A list of ID's that were not updated.
*/
fun incrementReadReceiptCounts(targetTimestamps: List<Long>, receiptAuthor: RecipientId, receiptSentTimestamp: Long): Set<Long> {
return incrementReceiptCounts(targetTimestamps, receiptAuthor, receiptSentTimestamp, ReceiptType.READ)
fun incrementReadReceiptCounts(targetTimestamps: List<Long>, receiptAuthor: RecipientId, receiptSentTimestamp: Long, receiptDataCache: MutableMap<Long, ReceiptData>? = null): Set<Long> {
return incrementReceiptCounts(targetTimestamps, receiptAuthor, receiptSentTimestamp, ReceiptType.READ, receiptDataCache = receiptDataCache)
}
fun incrementReadReceiptCount(targetTimestamps: Long, receiptAuthor: RecipientId, receiptSentTimestamp: Long): Boolean {
@@ -5092,13 +5092,13 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat
*
* @return All of the target timestamps that couldn't be found in the table.
*/
private fun incrementReceiptCounts(targetTimestamps: List<Long>, receiptAuthor: RecipientId, receiptSentTimestamp: Long, receiptType: ReceiptType, messageQualifier: MessageQualifier = MessageQualifier.ALL, stopwatch: Stopwatch? = null): Set<Long> {
private fun incrementReceiptCounts(targetTimestamps: List<Long>, receiptAuthor: RecipientId, receiptSentTimestamp: Long, receiptType: ReceiptType, messageQualifier: MessageQualifier = MessageQualifier.ALL, stopwatch: Stopwatch? = null, receiptDataCache: MutableMap<Long, ReceiptData>? = null): Set<Long> {
val messageUpdates: MutableSet<MessageReceiptUpdate> = HashSet()
val missingTargetTimestamps: MutableSet<Long> = HashSet()
writableDatabase.withinTransaction {
for (targetTimestamp in targetTimestamps) {
val updates: Set<MessageReceiptUpdate> = incrementReceiptCountInternal(targetTimestamp, receiptAuthor, receiptSentTimestamp, receiptType, messageQualifier, stopwatch)
val updates: Set<MessageReceiptUpdate> = incrementReceiptCountInternal(targetTimestamp, receiptAuthor, receiptSentTimestamp, receiptType, messageQualifier, stopwatch, receiptDataCache)
if (updates.isNotEmpty()) {
messageUpdates += updates
} else {
@@ -5131,7 +5131,7 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat
return missingTargetTimestamps
}
private fun incrementReceiptCountInternal(targetTimestamp: Long, receiptAuthor: RecipientId, receiptSentTimestamp: Long, receiptType: ReceiptType, messageQualifier: MessageQualifier, stopwatch: Stopwatch? = null): Set<MessageReceiptUpdate> {
private fun incrementReceiptCountInternal(targetTimestamp: Long, receiptAuthor: RecipientId, receiptSentTimestamp: Long, receiptType: ReceiptType, messageQualifier: MessageQualifier, stopwatch: Stopwatch? = null, receiptDataCache: MutableMap<Long, ReceiptData>? = null): Set<MessageReceiptUpdate> {
val qualifierWhere: String = when (messageQualifier) {
MessageQualifier.NORMAL -> " AND NOT ($IS_STORY_CLAUSE)"
MessageQualifier.STORY -> " AND $IS_STORY_CLAUSE"
@@ -5139,39 +5139,45 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat
}
// Note: While it is true that multiple messages can have the same (sent, author) pair, this should only happen for stories, which are handled below.
val receiptData: ReceiptData? = readableDatabase
.select(ID, THREAD_ID, STORY_TYPE, receiptType.columnName, TO_RECIPIENT_ID)
.from(TABLE_NAME)
.where(
"""
$DATE_SENT = $targetTimestamp AND
$FROM_RECIPIENT_ID = ? AND
(
$TO_RECIPIENT_ID = ? OR
EXISTS (
SELECT 1
FROM ${RecipientTable.TABLE_NAME}
WHERE
${RecipientTable.TABLE_NAME}.${RecipientTable.ID} = $TO_RECIPIENT_ID AND
${RecipientTable.TABLE_NAME}.${RecipientTable.TYPE} != ${RecipientTable.RecipientType.INDIVIDUAL.id}
val receiptData: ReceiptData? = receiptDataCache?.get(targetTimestamp)
?: readableDatabase
.select(ID, THREAD_ID, STORY_TYPE, receiptType.columnName, TO_RECIPIENT_ID)
.from(TABLE_NAME)
.where(
"""
$DATE_SENT = $targetTimestamp AND
$FROM_RECIPIENT_ID = ? AND
(
$TO_RECIPIENT_ID = ? OR
EXISTS (
SELECT 1
FROM ${RecipientTable.TABLE_NAME}
WHERE
${RecipientTable.TABLE_NAME}.${RecipientTable.ID} = $TO_RECIPIENT_ID AND
${RecipientTable.TABLE_NAME}.${RecipientTable.TYPE} != ${RecipientTable.RecipientType.INDIVIDUAL.id}
)
)
$qualifierWhere
""",
Recipient.self().id,
receiptAuthor
)
$qualifierWhere
""",
Recipient.self().id,
receiptAuthor
)
.limit(1)
.run()
.readToSingleObject { cursor ->
ReceiptData(
messageId = cursor.requireLong(ID),
threadId = cursor.requireLong(THREAD_ID),
storyType = StoryType.fromCode(cursor.requireInt(STORY_TYPE)),
marked = cursor.requireBoolean(receiptType.columnName),
forIndividualChat = cursor.requireLong(TO_RECIPIENT_ID) == receiptAuthor.toLong()
)
}
.limit(1)
.run()
.readToSingleObject { cursor ->
ReceiptData(
messageId = cursor.requireLong(ID),
threadId = cursor.requireLong(THREAD_ID),
storyType = StoryType.fromCode(cursor.requireInt(STORY_TYPE)),
marked = cursor.requireBoolean(receiptType.columnName),
forIndividualChat = cursor.requireLong(TO_RECIPIENT_ID) == receiptAuthor.toLong()
)
}
.also { result ->
if (result != null && !result.forIndividualChat) {
receiptDataCache?.put(targetTimestamp, result)
}
}
stopwatch?.split("receipt-query")
@@ -5191,11 +5197,15 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat
UPDATE $TABLE_NAME
SET
${receiptType.columnName} = 1,
$RECEIPT_TIMESTAMP = MAX($RECEIPT_TIMESTAMP, $receiptSentTimestamp)
$RECEIPT_TIMESTAMP = MAX($RECEIPT_TIMESTAMP, $receiptSentTimestamp)
WHERE
$ID = ${receiptData.messageId}
"""
)
if (receiptDataCache?.containsKey(targetTimestamp) == true) {
receiptDataCache[targetTimestamp] = receiptData.copy(marked = true)
}
}
stopwatch?.split("receipt-update")

View File

@@ -7,6 +7,7 @@ package org.thoughtcrime.securesms.messages
import org.signal.libsignal.zkgroup.groups.GroupMasterKey
import org.signal.libsignal.zkgroup.groups.GroupSecretParams
import org.thoughtcrime.securesms.database.MessageTable
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.database.model.GroupRecord
import org.thoughtcrime.securesms.dependencies.AppDependencies
@@ -38,6 +39,9 @@ abstract class BatchCache {
val groupRevisionCache = HashMap<GroupId, Int>(BATCH_SIZE)
val groupRecordCache = HashMap<GroupId.V2, Optional<GroupRecord>>(BATCH_SIZE)
val deliveryReceiptLookupCache = HashMap<Long, MessageTable.ReceiptData>(BATCH_SIZE)
val readReceiptLookupCache = HashMap<Long, MessageTable.ReceiptData>(BATCH_SIZE)
protected val groupSecretParamsAndIdCache = HashMap<GroupMasterKey, Pair<GroupSecretParams, GroupId.V2>>(BATCH_SIZE)
fun getGroupInfo(message: DataMessage): Pair<GroupSecretParams?, GroupId.V2?> {
@@ -56,6 +60,8 @@ abstract class BatchCache {
groupRevisionCache.clear()
groupRecordCache.clear()
groupSecretParamsAndIdCache.clear()
deliveryReceiptLookupCache.clear()
readReceiptLookupCache.clear()
}
protected fun flushJob(job: Job) {

View File

@@ -29,7 +29,7 @@ object ReceiptMessageProcessor {
when (receiptMessage.type) {
ReceiptMessage.Type.DELIVERY -> handleDeliveryReceipt(envelope, metadata, receiptMessage, senderRecipient.id, batchCache)
ReceiptMessage.Type.READ -> handleReadReceipt(context, senderRecipient.id, envelope, metadata, receiptMessage, earlyMessageCacheEntry)
ReceiptMessage.Type.READ -> handleReadReceipt(context, senderRecipient.id, envelope, metadata, receiptMessage, earlyMessageCacheEntry, batchCache)
ReceiptMessage.Type.VIEWED -> handleViewedReceipt(context, envelope, metadata, receiptMessage, senderRecipient.id, earlyMessageCacheEntry)
else -> warn(envelope.timestamp!!, "Unknown recipient message type ${receiptMessage.type}")
}
@@ -47,7 +47,7 @@ object ReceiptMessageProcessor {
val stopwatch: Stopwatch? = if (VERBOSE) Stopwatch("delivery-receipt", decimalPlaces = 2) else null
SignalTrace.beginSection("ReceiptMessageProcessor#incrementDeliveryReceiptCounts")
val missingTargetTimestamps: Set<Long> = SignalDatabase.messages.incrementDeliveryReceiptCounts(deliveryReceipt.timestamp, senderRecipientId, envelope.timestamp!!, stopwatch)
val missingTargetTimestamps: Set<Long> = SignalDatabase.messages.incrementDeliveryReceiptCounts(deliveryReceipt.timestamp, senderRecipientId, envelope.timestamp!!, stopwatch, batchCache.deliveryReceiptLookupCache)
SignalTrace.endSection()
for (targetTimestamp in missingTargetTimestamps) {
@@ -75,7 +75,8 @@ object ReceiptMessageProcessor {
envelope: Envelope,
metadata: EnvelopeMetadata,
readReceipt: ReceiptMessage,
earlyMessageCacheEntry: EarlyMessageCacheEntry?
earlyMessageCacheEntry: EarlyMessageCacheEntry?,
batchCache: BatchCache
) {
if (!TextSecurePreferences.isReadReceiptsEnabled(context)) {
log(envelope.timestamp!!, "Ignoring read receipts for IDs: " + readReceipt.timestamp.joinToString(", "))
@@ -85,7 +86,7 @@ object ReceiptMessageProcessor {
log(envelope.timestamp!!, "Processing read receipts. Sender: $senderRecipientId, Device: ${metadata.sourceDeviceId}, Timestamps: ${readReceipt.timestamp.joinToString(", ")}")
SignalTrace.beginSection("ReceiptMessageProcessor#incrementReadReceiptCounts")
val missingTargetTimestamps: Set<Long> = SignalDatabase.messages.incrementReadReceiptCounts(readReceipt.timestamp, senderRecipientId, envelope.timestamp!!)
val missingTargetTimestamps: Set<Long> = SignalDatabase.messages.incrementReadReceiptCounts(readReceipt.timestamp, senderRecipientId, envelope.timestamp!!, batchCache.readReceiptLookupCache)
SignalTrace.endSection()
if (missingTargetTimestamps.isNotEmpty()) {