Add receipt processing benchmark tests.

This commit is contained in:
Cody Henthorne
2026-02-25 08:59:05 -05:00
parent b6dd4a3579
commit c06944da13
13 changed files with 375 additions and 75 deletions

View File

@@ -14,6 +14,7 @@ import org.thoughtcrime.securesms.groups.GroupId
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.messages.SignalServiceProtoUtil.groupMasterKey
import org.thoughtcrime.securesms.messages.SignalServiceProtoUtil.hasGroupContext
import org.thoughtcrime.securesms.recipients.RecipientId
import org.whispersystems.signalservice.internal.push.DataMessage
import java.util.Optional
@@ -65,8 +66,13 @@ abstract class BatchCache {
SignalDatabase.threads.updateForMessageInsert(threadId, unarchive = true)
}
protected fun flushMslDelete(recipientId: RecipientId, device: Int, timestamps: List<Long>) {
SignalDatabase.messageLog.deleteEntriesForRecipient(timestamps, recipientId, device)
}
abstract fun addJob(job: Job)
abstract fun addIncomingMessageInsertThreadUpdate(threadId: Long)
abstract fun addMslDelete(recipientId: RecipientId, device: Int, timestamps: List<Long>)
}
/**
@@ -83,6 +89,10 @@ class OneTimeBatchCache : BatchCache() {
override fun addIncomingMessageInsertThreadUpdate(threadId: Long) {
flushIncomingMessageInsertThreadUpdate(threadId)
}
override fun addMslDelete(recipientId: RecipientId, device: Int, timestamps: List<Long>) {
flushMslDelete(recipientId, device, timestamps)
}
}
/**
@@ -100,6 +110,7 @@ class ReusedBatchCache : BatchCache() {
private val batchedJobs = ArrayList<Job>(BATCH_SIZE)
private val threadUpdates = HashSet<Long>(BATCH_SIZE)
private val mslDeletes = HashMap<Pair<RecipientId, Int>, MutableList<Long>>(BATCH_SIZE)
override fun addJob(job: Job) {
batchedJobs += job
@@ -109,6 +120,10 @@ class ReusedBatchCache : BatchCache() {
threadUpdates += threadId
}
override fun addMslDelete(recipientId: RecipientId, device: Int, timestamps: List<Long>) {
mslDeletes.getOrPut(recipientId to device) { mutableListOf() } += timestamps
}
override fun flushAndClear() {
super.flushAndClear()
@@ -123,5 +138,12 @@ class ReusedBatchCache : BatchCache() {
}
}
threadUpdates.clear()
if (mslDeletes.isNotEmpty()) {
SignalDatabase.runInTransaction {
mslDeletes.forEach { (key, timestamps) -> flushMslDelete(key.first, key.second, timestamps) }
}
}
mslDeletes.clear()
}
}

View File

@@ -496,7 +496,8 @@ open class MessageContentProcessor(private val context: Context) {
envelope,
content,
metadata,
if (processingEarlyContent) null else EarlyMessageCacheEntry(envelope, content, metadata, serverDeliveredTimestamp)
if (processingEarlyContent) null else EarlyMessageCacheEntry(envelope, content, metadata, serverDeliveredTimestamp),
batchCache
)
}

View File

@@ -12,6 +12,7 @@ import org.thoughtcrime.securesms.messages.MessageContentProcessor.Companion.war
import org.thoughtcrime.securesms.recipients.Recipient
import org.thoughtcrime.securesms.recipients.RecipientId
import org.thoughtcrime.securesms.util.EarlyMessageCacheEntry
import org.thoughtcrime.securesms.util.SignalTrace
import org.thoughtcrime.securesms.util.TextSecurePreferences
import org.whispersystems.signalservice.api.crypto.EnvelopeMetadata
import org.whispersystems.signalservice.internal.push.Content
@@ -23,11 +24,11 @@ object ReceiptMessageProcessor {
private const val VERBOSE = false
fun process(context: Context, senderRecipient: Recipient, envelope: Envelope, content: Content, metadata: EnvelopeMetadata, earlyMessageCacheEntry: EarlyMessageCacheEntry?) {
fun process(context: Context, senderRecipient: Recipient, envelope: Envelope, content: Content, metadata: EnvelopeMetadata, earlyMessageCacheEntry: EarlyMessageCacheEntry?, batchCache: BatchCache) {
val receiptMessage = content.receiptMessage!!
when (receiptMessage.type) {
ReceiptMessage.Type.DELIVERY -> handleDeliveryReceipt(envelope, metadata, receiptMessage, senderRecipient.id)
ReceiptMessage.Type.DELIVERY -> handleDeliveryReceipt(envelope, metadata, receiptMessage, senderRecipient.id, batchCache)
ReceiptMessage.Type.READ -> handleReadReceipt(context, senderRecipient.id, envelope, metadata, receiptMessage, earlyMessageCacheEntry)
ReceiptMessage.Type.VIEWED -> handleViewedReceipt(context, envelope, metadata, receiptMessage, senderRecipient.id, earlyMessageCacheEntry)
else -> warn(envelope.timestamp!!, "Unknown recipient message type ${receiptMessage.type}")
@@ -39,12 +40,15 @@ object ReceiptMessageProcessor {
envelope: Envelope,
metadata: EnvelopeMetadata,
deliveryReceipt: ReceiptMessage,
senderRecipientId: RecipientId
senderRecipientId: RecipientId,
batchCache: BatchCache
) {
log(envelope.timestamp!!, "Processing delivery receipts. Sender: $senderRecipientId, Device: ${metadata.sourceDeviceId}, Timestamps: ${deliveryReceipt.timestamp.joinToString(", ")}")
val stopwatch: Stopwatch? = if (VERBOSE) Stopwatch("delivery-receipt", decimalPlaces = 2) else null
SignalTrace.beginSection("ReceiptMessageProcessor#incrementDeliveryReceiptCounts")
val missingTargetTimestamps: Set<Long> = SignalDatabase.messages.incrementDeliveryReceiptCounts(deliveryReceipt.timestamp, senderRecipientId, envelope.timestamp!!, stopwatch)
SignalTrace.endSection()
for (targetTimestamp in missingTargetTimestamps) {
warn(envelope.timestamp!!, "[handleDeliveryReceipt] Could not find matching message! targetTimestamp: $targetTimestamp, receiptAuthor: $senderRecipientId")
@@ -58,7 +62,7 @@ object ReceiptMessageProcessor {
SignalDatabase.pendingPniSignatureMessages.acknowledgeReceipts(senderRecipientId, deliveryReceipt.timestamp, metadata.sourceDeviceId)
stopwatch?.split("pni-signatures")
SignalDatabase.messageLog.deleteEntriesForRecipient(deliveryReceipt.timestamp, senderRecipientId, metadata.sourceDeviceId)
batchCache.addMslDelete(senderRecipientId, metadata.sourceDeviceId, deliveryReceipt.timestamp)
stopwatch?.split("msl")
stopwatch?.stop(TAG)
@@ -80,7 +84,9 @@ object ReceiptMessageProcessor {
log(envelope.timestamp!!, "Processing read receipts. Sender: $senderRecipientId, Device: ${metadata.sourceDeviceId}, Timestamps: ${readReceipt.timestamp.joinToString(", ")}")
SignalTrace.beginSection("ReceiptMessageProcessor#incrementReadReceiptCounts")
val missingTargetTimestamps: Set<Long> = SignalDatabase.messages.incrementReadReceiptCounts(readReceipt.timestamp, senderRecipientId, envelope.timestamp!!)
SignalTrace.endSection()
if (missingTargetTimestamps.isNotEmpty()) {
val selfId = Recipient.self().id