Fix incorrect transaction batching during conversation delete.

This commit is contained in:
Cody Henthorne
2026-03-03 09:46:39 -05:00
committed by Greyson Parrelli
parent 7fbcd17759
commit e23d575460
8 changed files with 304 additions and 60 deletions

View File

@@ -56,6 +56,13 @@ class BenchmarkCommandReceiver : BroadcastReceiver() {
BenchmarkWebSocketConnection.startWholeBatchTrace()
BenchmarkWebSocketConnection.releaseMessages()
}
"delete-thread" -> {
val pendingResult = goAsync()
Thread {
handleDeleteThread()
pendingResult.finish()
}.start()
}
else -> Log.w(TAG, "Unknown command: $command")
}
}
@@ -144,6 +151,20 @@ class BenchmarkCommandReceiver : BroadcastReceiver() {
}
}
private fun handleDeleteThread() {
val threadId = SignalDatabase.threads.getRecentConversationList(1, false, false).use { cursor ->
if (cursor.moveToFirst()) {
cursor.getLong(cursor.getColumnIndexOrThrow("_id"))
} else {
Log.w(TAG, "No active threads found for deletion benchmark")
return
}
}
Log.i(TAG, "Deleting thread $threadId")
SignalDatabase.threads.deleteConversation(threadId, syncThreadDelete = false)
Log.i(TAG, "Thread $threadId deleted")
}
private fun getOutgoingGroupMessageTimestamps(): List<Long> {
val groupId = GroupId.v2(Harness.groupMasterKey)
val groupRecipient = Recipient.externalGroupExact(groupId)

View File

@@ -1,7 +1,15 @@
package org.signal.benchmark
import android.os.Bundle
import android.widget.TextView
import androidx.activity.compose.setContent
import androidx.compose.material3.CircularProgressIndicator
import androidx.compose.material3.Text
import androidx.compose.runtime.getValue
import androidx.compose.runtime.mutableStateOf
import androidx.compose.runtime.setValue
import androidx.lifecycle.lifecycleScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.signal.benchmark.setup.TestMessages
import org.signal.benchmark.setup.TestUsers
import org.thoughtcrime.securesms.BaseActivity
@@ -15,19 +23,29 @@ class BenchmarkSetupActivity : BaseActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
when (intent.extras!!.getString("setup-type")) {
"cold-start" -> setupColdStart()
"conversation-open" -> setupConversationOpen()
"message-send" -> setupMessageSend()
"group-message-send" -> setupGroupMessageSend()
"group-delivery-receipt" -> setupGroupReceipt(includeMsl = true)
"group-read-receipt" -> setupGroupReceipt(enableReadReceipts = true)
var setupComplete by mutableStateOf(false)
setContent {
if (setupComplete) {
Text("done")
} else {
CircularProgressIndicator()
}
}
val textView: TextView = TextView(this).apply {
text = "done"
lifecycleScope.launch(Dispatchers.IO) {
when (intent.extras!!.getString("setup-type")) {
"cold-start" -> setupColdStart()
"conversation-open" -> setupConversationOpen()
"message-send" -> setupMessageSend()
"group-message-send" -> setupGroupMessageSend()
"group-delivery-receipt" -> setupGroupReceipt(includeMsl = true)
"group-read-receipt" -> setupGroupReceipt(enableReadReceipts = true)
"thread-delete" -> setupThreadDelete()
"thread-delete-group" -> setupThreadDeleteGroup()
}
setupComplete = true
}
setContentView(textView)
}
private fun setupColdStart() {
@@ -74,6 +92,65 @@ class BenchmarkSetupActivity : BaseActivity() {
TestUsers.setupGroup()
}
private fun setupThreadDelete() {
TestUsers.setupSelf()
val recipientIds = TestUsers.setupTestRecipients(2)
val recipient = Recipient.resolved(recipientIds[0])
val reactionAuthor = recipientIds[1]
val messagesToAdd = 20_000
val generator = TestMessages.TimestampGenerator(System.currentTimeMillis() - (messagesToAdd * 2000L) - 60_000L)
for (i in 0 until messagesToAdd) {
val timestamp = generator.nextTimestamp()
when {
i % 20 == 0 -> TestMessages.insertIncomingVoiceMessage(other = recipient, timestamp = timestamp)
i % 4 == 0 -> TestMessages.insertIncomingImageMessage(other = recipient, attachmentCount = 1, timestamp = timestamp)
else -> TestMessages.insertIncomingTextMessage(other = recipient, body = "Message $i", timestamp = timestamp)
}
}
val threadId = SignalDatabase.threads.getOrCreateThreadIdFor(recipient = recipient)
TestDbUtils.insertReactionsForThread(threadId, reactionAuthor, moduloFilter = 5)
SignalDatabase.threads.update(threadId, true)
}
private fun setupThreadDeleteGroup() {
TestUsers.setupSelf()
val groupId = TestUsers.setupGroup()
val groupRecipient = Recipient.externalGroupExact(groupId)
val threadId = SignalDatabase.threads.getOrCreateThreadIdFor(groupRecipient)
val selfId = Recipient.self().id
val memberRecipientIds = SignalDatabase.groups.getGroup(groupId).get().members.filter { it != selfId }
val messagesToAdd = 20_000
val generator = TestMessages.TimestampGenerator(System.currentTimeMillis() - (messagesToAdd * 2000L) - 60_000L)
for (i in 0 until messagesToAdd) {
val timestamp = generator.nextTimestamp()
when {
i % 4 == 0 -> TestMessages.insertOutgoingImageMessage(other = groupRecipient, attachmentCount = 1, timestamp = timestamp)
else -> {
val message = OutgoingMessage(
recipient = groupRecipient,
body = "Message $i",
timestamp = timestamp,
isSecure = true
)
val insert = SignalDatabase.messages.insertMessageOutbox(message, threadId, false, null)
SignalDatabase.messages.markAsSent(insert.messageId, true)
}
}
}
TestDbUtils.insertGroupReceiptsForThread(threadId, memberRecipientIds)
TestDbUtils.insertReactionsForThread(threadId, memberRecipientIds[0], moduloFilter = 5)
TestDbUtils.insertMentionsForThread(threadId, memberRecipientIds[0], moduloFilter = 10)
SignalDatabase.threads.update(threadId, true)
}
private fun setupGroupReceipt(includeMsl: Boolean = false, enableReadReceipts: Boolean = false) {
TestUsers.setupSelf()
val groupId = TestUsers.setupGroup()

View File

@@ -14,6 +14,62 @@ object TestDbUtils {
val rowsUpdated = database.update(MessageTable.TABLE_NAME, contentValues, DatabaseTable.ID_WHERE, buildArgs(messageId))
}
/**
* Bulk-inserts a reaction on every Nth message (by _id modulo) in the given thread.
*/
fun insertReactionsForThread(threadId: Long, authorId: RecipientId, moduloFilter: Int) {
val db = SignalDatabase.messages.databaseHelper.signalWritableDatabase
db.execSQL(
"""
INSERT INTO reaction (message_id, author_id, emoji, date_sent, date_received)
SELECT ${MessageTable.ID}, ?, '👍', ${MessageTable.DATE_SENT}, ${MessageTable.DATE_RECEIVED}
FROM ${MessageTable.TABLE_NAME}
WHERE ${MessageTable.THREAD_ID} = ? AND ${MessageTable.ID} % ? = 0
""".trimIndent(),
arrayOf(authorId.toLong().toString(), threadId.toString(), moduloFilter.toString())
)
}
/**
* Bulk-inserts group receipt rows for every message in the given thread, one row per member.
*/
fun insertGroupReceiptsForThread(threadId: Long, memberRecipientIds: List<RecipientId>) {
val db = SignalDatabase.messages.databaseHelper.signalWritableDatabase
db.beginTransaction()
try {
for (recipientId in memberRecipientIds) {
db.execSQL(
"""
INSERT INTO group_receipts (mms_id, address, status, timestamp)
SELECT ${MessageTable.ID}, ?, 2, ${MessageTable.DATE_SENT}
FROM ${MessageTable.TABLE_NAME}
WHERE ${MessageTable.THREAD_ID} = ?
""".trimIndent(),
arrayOf(recipientId.toLong().toString(), threadId.toString())
)
}
db.setTransactionSuccessful()
} finally {
db.endTransaction()
}
}
/**
* Bulk-inserts a mention on every Nth message (by _id modulo) in the given thread.
*/
fun insertMentionsForThread(threadId: Long, mentionedRecipientId: RecipientId, moduloFilter: Int) {
val db = SignalDatabase.messages.databaseHelper.signalWritableDatabase
db.execSQL(
"""
INSERT INTO mention (thread_id, message_id, recipient_id, range_start, range_length)
SELECT ${MessageTable.THREAD_ID}, ${MessageTable.ID}, ?, 0, 5
FROM ${MessageTable.TABLE_NAME}
WHERE ${MessageTable.THREAD_ID} = ? AND ${MessageTable.ID} % ? = 0
""".trimIndent(),
arrayOf(mentionedRecipientId.toLong().toString(), threadId.toString(), moduloFilter.toString())
)
}
fun getOutgoingMessageTimestamps(threadId: Long, selfRecipientId: Long): List<Long> {
val timestamps = mutableListOf<Long>()
SignalDatabase.messages.databaseHelper.signalReadableDatabase.query(

View File

@@ -141,6 +141,7 @@ import org.thoughtcrime.securesms.revealable.ViewOnceUtil
import org.thoughtcrime.securesms.sms.GroupV2UpdateMessageUtil
import org.thoughtcrime.securesms.stories.Stories.isFeatureEnabled
import org.thoughtcrime.securesms.util.JsonUtils
import org.thoughtcrime.securesms.util.SignalTrace
import org.thoughtcrime.securesms.util.MediaUtil
import org.thoughtcrime.securesms.util.MessageConstraintsUtil
import org.thoughtcrime.securesms.util.RemoteConfig
@@ -228,6 +229,7 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat
const val QUOTE_TARGET_MISSING_ID = -1L
const val ADDRESSABLE_MESSAGE_LIMIT = 5
private const val DELETE_BATCH_SIZE = 1000
const val PARENT_STORY_MISSING_ID = -1L
const val PIN_FOREVER = Long.MAX_VALUE
@@ -3972,14 +3974,12 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat
return 0
}
writableDatabase.withinTransaction { db ->
SignalDatabase.messageSearch.dropAfterMessageDeleteTrigger()
SignalDatabase.messageLog.dropAfterMessageDeleteTrigger()
for (threadId in threadsWithPossibleDeletes) {
val subSelect = "SELECT ${TABLE_NAME}.$ID FROM $TABLE_NAME WHERE ${TABLE_NAME}.$THREAD_ID = $threadId $extraWhere LIMIT 1000"
do {
// Bulk deleting FK tables for large message delete efficiency
SignalTrace.beginSection("MessageTable#deleteMessagesInThread")
for (threadId in threadsWithPossibleDeletes) {
val subSelect = "SELECT ${TABLE_NAME}.$ID FROM $TABLE_NAME WHERE ${TABLE_NAME}.$THREAD_ID = $threadId $extraWhere LIMIT $DELETE_BATCH_SIZE"
var deletedCount: Int
do {
deletedCount = writableDatabase.withinTransaction { db ->
db.delete(StorySendTable.TABLE_NAME)
.where("${StorySendTable.TABLE_NAME}.${StorySendTable.MESSAGE_ID} IN ($subSelect)")
.run()
@@ -3992,23 +3992,28 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat
.where("${CallTable.TABLE_NAME}.${CallTable.MESSAGE_ID} IN ($subSelect)")
.run()
// Must delete rows from FTS table before deleting from main table due to FTS requirement when deleting by rowid
db.delete(SearchTable.FTS_TABLE_NAME)
.where("${SearchTable.FTS_TABLE_NAME}.${SearchTable.ID} IN ($subSelect)")
db.delete(AttachmentTable.TABLE_NAME)
.where("${AttachmentTable.TABLE_NAME}.${AttachmentTable.MESSAGE_ID} IN ($subSelect)")
.run()
// Actually delete messages
val deletedCount = db.delete(TABLE_NAME)
db.delete(GroupReceiptTable.TABLE_NAME)
.where("${GroupReceiptTable.TABLE_NAME}.${GroupReceiptTable.MMS_ID} IN ($subSelect)")
.run()
db.delete(MentionTable.TABLE_NAME)
.where("${MentionTable.TABLE_NAME}.${MentionTable.MESSAGE_ID} IN ($subSelect)")
.run()
// Delete the messages themselves
db.delete(TABLE_NAME)
.where("$ID IN ($subSelect)")
.run()
}
totalDeletedCount += deletedCount
} while (deletedCount > 0)
}
SignalDatabase.messageSearch.restoreAfterMessageDeleteTrigger()
SignalDatabase.messageLog.restoreAfterMessageDeleteTrigger()
totalDeletedCount += deletedCount
} while (deletedCount > 0)
}
SignalTrace.endSection()
return totalDeletedCount
}

View File

@@ -68,6 +68,7 @@ import org.thoughtcrime.securesms.recipients.RecipientId
import org.thoughtcrime.securesms.recipients.RecipientUtil
import org.thoughtcrime.securesms.storage.StorageSyncHelper
import org.thoughtcrime.securesms.util.ConversationUtil
import org.thoughtcrime.securesms.util.SignalTrace
import org.thoughtcrime.securesms.util.JsonUtils
import org.thoughtcrime.securesms.util.JsonUtils.SaneJSONObject
import org.thoughtcrime.securesms.util.TextSecurePreferences
@@ -1315,46 +1316,43 @@ class ThreadTable(context: Context, databaseHelper: SignalDatabase) : DatabaseTa
}
fun deleteConversations(selectedConversations: Set<Long>, syncThreadDeletes: Boolean = true) {
SignalTrace.beginSection("ThreadTable#deleteConversations")
Log.d(TAG, "[deleteConversations] Deleting ${selectedConversations.size} chats syncThreadDeletes: $syncThreadDeletes")
val recipientIds = getRecipientIdsForThreadIds(selectedConversations)
val addressableMessages = mutableListOf<ThreadDeleteSyncInfo>()
val queries: List<SqlUtil.Query> = SqlUtil.buildCollectionQuery(ID, selectedConversations)
Log.d(TAG, "[deleteConversations] Enter transaction")
writableDatabase.withinTransaction { db ->
if (syncThreadDeletes) {
for (threadId in selectedConversations) {
val mostRecentMessages = messages.getMostRecentAddressableMessages(threadId, excludeExpiring = false)
val mostRecentNonExpiring = if (mostRecentMessages.size == MessageTable.ADDRESSABLE_MESSAGE_LIMIT && mostRecentMessages.any { it.expiresIn > 0 }) {
messages.getMostRecentAddressableMessages(threadId, excludeExpiring = true)
} else {
emptySet()
}
addressableMessages += ThreadDeleteSyncInfo(threadId, mostRecentMessages, mostRecentNonExpiring)
// Phase 1: Collect sync info (reads only, before any deletion)
if (syncThreadDeletes) {
for (threadId in selectedConversations) {
val mostRecentMessages = messages.getMostRecentAddressableMessages(threadId, excludeExpiring = false)
val mostRecentNonExpiring = if (mostRecentMessages.size == MessageTable.ADDRESSABLE_MESSAGE_LIMIT && mostRecentMessages.any { it.expiresIn > 0 }) {
messages.getMostRecentAddressableMessages(threadId, excludeExpiring = true)
} else {
emptySet()
}
Log.d(TAG, "[deleteConversations] Retrieved sync thread delete addressable messages (${addressableMessages.size})")
} else {
Log.d(TAG, "[deleteConversations] No addressable messages needed")
}
Log.d(TAG, "[deleteConversations] Deactivating threads")
addressableMessages += ThreadDeleteSyncInfo(threadId, mostRecentMessages, mostRecentNonExpiring)
}
Log.d(TAG, "[deleteConversations] Retrieved sync thread delete addressable messages (${addressableMessages.size})")
} else {
Log.d(TAG, "[deleteConversations] No addressable messages needed")
}
// Phase 2: Delete messages (per-batch transactions, write lock released between batches)
Log.d(TAG, "[deleteConversations] Deleting messages in thread")
messages.deleteMessagesInThread(selectedConversations)
// Phase 3: Final lightweight transaction (deactivate threads, clear drafts, update cache)
val queries: List<SqlUtil.Query> = SqlUtil.buildCollectionQuery(ID, selectedConversations)
Log.d(TAG, "[deleteConversations] Deactivating threads and cleaning up")
writableDatabase.withinTransaction { db ->
for (query in queries) {
db.deactivateThread(query)
}
Log.d(TAG, "[deleteConversations] Deleting messages in thread")
messages.deleteMessagesInThread(selectedConversations)
Log.d(TAG, "[deleteConversations] Trimming attachments")
attachments.trimAllAbandonedAttachments()
Log.d(TAG, "[deleteConversations] Deleting abandoned group receipts")
groupReceipts.deleteAbandonedRows()
Log.d(TAG, "[deleteConversations] Deleting abandoned mentions")
mentions.deleteAbandonedMentions()
Log.d(TAG, "[deleteConversations] Clearing drafts")
drafts.clearDrafts(selectedConversations)
Log.d(TAG, "[deleteConversations] Updating threadId cache")
synchronized(threadIdCache) {
for (recipientId in recipientIds) {
threadIdCache.remove(recipientId)
@@ -1378,6 +1376,7 @@ class ThreadTable(context: Context, databaseHelper: SignalDatabase) : DatabaseTa
ConversationUtil.clearShortcuts(context, recipientIds)
OptimizeMessageSearchIndexJob.enqueue()
SignalTrace.endSection()
}
@SuppressLint("DiscouragedApi")

View File

@@ -47,4 +47,10 @@ object BenchmarkMetrics {
get() = listOf(
TraceSectionMetric("ReceiptMessageProcessor#incrementReadReceiptCounts", Mode.Average)
)
val threadDeletion: List<TraceSectionMetric>
get() = listOf(
TraceSectionMetric("ThreadTable#deleteConversations", Mode.Sum),
TraceSectionMetric("MessageTable#deleteMessagesInThread", Mode.Sum)
)
}

View File

@@ -8,10 +8,10 @@ object BenchmarkSetup {
private const val TARGET_PACKAGE = "org.thoughtcrime.securesms.benchmark"
private const val RECEIVER = "org.signal.benchmark.BenchmarkCommandReceiver"
fun setup(type: String, device: UiDevice) {
fun setup(type: String, device: UiDevice, timeout: Long = 25_000L) {
device.executeShellCommand("pm clear $TARGET_PACKAGE")
device.executeShellCommand("am start -W -n $TARGET_PACKAGE/org.signal.benchmark.BenchmarkSetupActivity --es setup-type $type")
device.wait(Until.hasObject(By.textContains("done")), 25_000L)
device.wait(Until.hasObject(By.textContains("done")), timeout)
}
fun setupIndividualSend(device: UiDevice) {
@@ -34,6 +34,10 @@ object BenchmarkSetup {
device.benchmarkCommandBroadcast("release-messages")
}
fun deleteThread(device: UiDevice) {
device.benchmarkCommandBroadcast("delete-thread")
}
private fun UiDevice.benchmarkCommandBroadcast(command: String) {
executeShellCommand("am broadcast -a org.signal.benchmark.action.COMMAND -e command $command -n $TARGET_PACKAGE/$RECEIVER")
}

View File

@@ -0,0 +1,76 @@
/*
* Copyright 2026 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.benchmark
import androidx.annotation.RequiresApi
import androidx.benchmark.macro.CompilationMode
import androidx.benchmark.macro.ExperimentalMetricApi
import androidx.benchmark.macro.junit4.MacrobenchmarkRule
import androidx.test.ext.junit.runners.AndroidJUnit4
import androidx.test.uiautomator.By
import androidx.test.uiautomator.Until
import org.junit.Rule
import org.junit.Test
import org.junit.runner.RunWith
/**
* Macrobenchmark for measuring thread deletion performance.
*
* Inserts 20,000 messages into a conversation, then measures the time
* to delete the entire thread using per-batch transactions.
*
* Two variants:
* - [deleteThread20kMessages]: 1:1 conversation with attachments and reactions
* - [deleteGroupThread20kMessages]: Group conversation with attachments, reactions,
* group receipts (5 members x 20k = 100k rows), and mentions
*/
@OptIn(ExperimentalMetricApi::class)
@RunWith(AndroidJUnit4::class)
@RequiresApi(31)
class ThreadDeletionBenchmarks {
@get:Rule
val benchmarkRule = MacrobenchmarkRule()
@Test
fun deleteThread20kMessages() {
benchmarkRule.measureRepeated(
packageName = "org.thoughtcrime.securesms.benchmark",
metrics = BenchmarkMetrics.threadDeletion,
iterations = 1,
compilationMode = CompilationMode.Partial(),
setupBlock = {
BenchmarkSetup.setup("thread-delete", device, timeout = 120_000L)
killProcess()
startActivityAndWait()
device.waitForIdle()
device.wait(Until.findObject(By.textContains("Buddy")), 10_000)
}
) {
BenchmarkSetup.deleteThread(device)
device.wait(Until.gone(By.textContains("Buddy")), 300_000L)
}
}
@Test
fun deleteGroupThread20kMessages() {
benchmarkRule.measureRepeated(
packageName = "org.thoughtcrime.securesms.benchmark",
metrics = BenchmarkMetrics.threadDeletion,
iterations = 1,
compilationMode = CompilationMode.Partial(),
setupBlock = {
BenchmarkSetup.setup("thread-delete-group", device, timeout = 180_000L)
killProcess()
startActivityAndWait()
device.waitForIdle()
device.wait(Until.findObject(By.textContains("Title")), 10_000)
}
) {
BenchmarkSetup.deleteThread(device)
device.wait(Until.gone(By.textContains("Title")), 300_000L)
}
}
}