From 61ba2ac97a2735c69136ddbe308b29415b59c131 Mon Sep 17 00:00:00 2001 From: Cody Henthorne Date: Thu, 26 Feb 2026 10:43:00 -0500 Subject: [PATCH] Improve message processing performance. --- .../benchmark/BenchmarkCommandReceiver.kt | 59 +++++------ .../websocket/BenchmarkWebSocketConnection.kt | 36 +++++-- .../securesms/database/SignalDatabase.kt | 16 +++ .../securesms/messages/BatchCache.kt | 9 +- .../messages/IncomingMessageObserver.kt | 98 ++++++++++++++----- .../securesms/util/Environment.kt | 1 + .../benchmark/BenchmarkMetrics.kt | 8 +- .../benchmark/ConversationBenchmarks.kt | 2 +- .../GroupMessageProcessingBenchmarks.kt | 8 +- .../benchmark/MessageProcessingBenchmarks.kt | 6 +- .../benchmark/StartupBenchmarks.kt | 4 +- 11 files changed, 166 insertions(+), 81 deletions(-) diff --git a/app/src/benchmarkShared/java/org/signal/benchmark/BenchmarkCommandReceiver.kt b/app/src/benchmarkShared/java/org/signal/benchmark/BenchmarkCommandReceiver.kt index 9fa33fa4ba..232905e8dd 100644 --- a/app/src/benchmarkShared/java/org/signal/benchmark/BenchmarkCommandReceiver.kt +++ b/app/src/benchmarkShared/java/org/signal/benchmark/BenchmarkCommandReceiver.kt @@ -53,8 +53,8 @@ class BenchmarkCommandReceiver : BroadcastReceiver() { "group-delivery-receipt" -> handlePrepareGroupReceipts { client, timestamps -> client.generateInboundDeliveryReceipts(timestamps) } "group-read-receipt" -> handlePrepareGroupReceipts { client, timestamps -> client.generateInboundReadReceipts(timestamps) } "release-messages" -> { - BenchmarkWebSocketConnection.authInstance.startWholeBatchTrace = true - BenchmarkWebSocketConnection.authInstance.releaseMessages() + BenchmarkWebSocketConnection.startWholeBatchTrace() + BenchmarkWebSocketConnection.releaseMessages() } else -> Log.w(TAG, "Unknown command: $command") } @@ -68,25 +68,23 @@ class BenchmarkCommandReceiver : BroadcastReceiver() { runBlocking { launch(Dispatchers.IO) { - BenchmarkWebSocketConnection.authInstance.run { - Log.i(TAG, "Sending initial message form Bob to establish session.") - addPendingMessages(listOf(encryptedEnvelope.toWebSocketPayload())) - releaseMessages() + Log.i(TAG, "Sending initial message form Bob to establish session.") + BenchmarkWebSocketConnection.addPendingMessages(listOf(encryptedEnvelope.toWebSocketPayload())) + BenchmarkWebSocketConnection.releaseMessages() - // Sleep briefly to let the message be processed. - ThreadUtil.sleep(100) - } + // Sleep briefly to let the message be processed. + ThreadUtil.sleep(1000) } } // Have Bob generate N messages that will be received by Alice - val messageCount = 100 + val messageCount = 500 val envelopes = client.generateInboundEnvelopes(messageCount) val messages = envelopes.map { e -> e.toWebSocketPayload() } - BenchmarkWebSocketConnection.authInstance.addPendingMessages(messages) - BenchmarkWebSocketConnection.authInstance.addQueueEmptyMessage() + BenchmarkWebSocketConnection.addPendingMessages(messages) + BenchmarkWebSocketConnection.addQueueEmptyMessage() } private fun handlePrepareGroupSend() { @@ -97,27 +95,24 @@ class BenchmarkCommandReceiver : BroadcastReceiver() { runBlocking { launch(Dispatchers.IO) { - BenchmarkWebSocketConnection.authInstance.run { - Log.i(TAG, "Sending initial group messages from client to establish sessions.") - addPendingMessages(encryptedEnvelopes.map { it.toWebSocketPayload() }) - releaseMessages() + Log.i(TAG, "Sending initial group messages from client to establish sessions.") + BenchmarkWebSocketConnection.addPendingMessages(encryptedEnvelopes.map { it.toWebSocketPayload() }) + BenchmarkWebSocketConnection.releaseMessages() - // Sleep briefly to let the messages be processed. - ThreadUtil.sleep(1000) - } + // Sleep briefly to let the messages be processed. + ThreadUtil.sleep(1000) } } // Have clients generate N group messages that will be received by Alice - clients.forEach { client -> + val allClientMessages = clients.map { client -> val messageCount = 100 val envelopes = client.generateInboundGroupEnvelopes(messageCount, Harness.groupMasterKey) - - val messages = envelopes.map { e -> e.toWebSocketPayload() } - - BenchmarkWebSocketConnection.authInstance.addPendingMessages(messages) + envelopes.map { e -> e.toWebSocketPayload() } } - BenchmarkWebSocketConnection.authInstance.addQueueEmptyMessage() + + BenchmarkWebSocketConnection.addPendingMessages(interleave(allClientMessages)) + BenchmarkWebSocketConnection.addQueueEmptyMessage() } private fun handlePrepareGroupReceipts(generateReceipts: (OtherClient, List) -> List) { @@ -132,8 +127,8 @@ class BenchmarkCommandReceiver : BroadcastReceiver() { generateReceipts(client, timestamps).map { it.toWebSocketPayload() } } - BenchmarkWebSocketConnection.authInstance.addPendingMessages(interleave(allClientEnvelopes)) - BenchmarkWebSocketConnection.authInstance.addQueueEmptyMessage() + BenchmarkWebSocketConnection.addPendingMessages(interleave(allClientEnvelopes)) + BenchmarkWebSocketConnection.addQueueEmptyMessage() } private fun establishGroupSessions(clients: List) { @@ -141,12 +136,10 @@ class BenchmarkCommandReceiver : BroadcastReceiver() { runBlocking { launch(Dispatchers.IO) { - BenchmarkWebSocketConnection.authInstance.run { - Log.i(TAG, "Sending initial group messages from clients to establish sessions.") - addPendingMessages(encryptedEnvelopes.map { it.toWebSocketPayload() }) - releaseMessages() - ThreadUtil.sleep(1000) - } + Log.i(TAG, "Sending initial group messages from clients to establish sessions.") + BenchmarkWebSocketConnection.addPendingMessages(encryptedEnvelopes.map { it.toWebSocketPayload() }) + BenchmarkWebSocketConnection.releaseMessages() + ThreadUtil.sleep(1000) } } } diff --git a/app/src/benchmarkShared/java/org/whispersystems/signalservice/internal/websocket/BenchmarkWebSocketConnection.kt b/app/src/benchmarkShared/java/org/whispersystems/signalservice/internal/websocket/BenchmarkWebSocketConnection.kt index 9dd54f4c3a..211d9d7f1a 100644 --- a/app/src/benchmarkShared/java/org/whispersystems/signalservice/internal/websocket/BenchmarkWebSocketConnection.kt +++ b/app/src/benchmarkShared/java/org/whispersystems/signalservice/internal/websocket/BenchmarkWebSocketConnection.kt @@ -29,23 +29,42 @@ import java.util.concurrent.TimeoutException class BenchmarkWebSocketConnection : WebSocketConnection { companion object { - lateinit var authInstance: BenchmarkWebSocketConnection - private set + private val authInstances = mutableListOf() + private val unauthInstances = mutableListOf() @Synchronized fun createAuthInstance(): WebSocketConnection { - authInstance = BenchmarkWebSocketConnection() + val authInstance = BenchmarkWebSocketConnection() + authInstances += authInstance return authInstance } - lateinit var unauthInstance: BenchmarkWebSocketConnection - private set - @Synchronized fun createUnauthInstance(): WebSocketConnection { - unauthInstance = BenchmarkWebSocketConnection() + val unauthInstance = BenchmarkWebSocketConnection() + unauthInstances += unauthInstance return unauthInstance } + + @Synchronized + fun startWholeBatchTrace() { + authInstances.filterNot(BenchmarkWebSocketConnection::isShutdown).forEach { it.startWholeBatchTrace = true } + } + + @Synchronized + fun releaseMessages() { + authInstances.filterNot(BenchmarkWebSocketConnection::isShutdown).forEach { it.releaseMessages() } + } + + @Synchronized + fun addPendingMessages(messages: List) { + authInstances.filterNot(BenchmarkWebSocketConnection::isShutdown).forEach { it.addPendingMessages(messages) } + } + + @Synchronized + fun addQueueEmptyMessage() { + authInstances.filterNot(BenchmarkWebSocketConnection::isShutdown).forEach { it.addQueueEmptyMessage() } + } } override val name: String = "bench-${System.identityHashCode(this)}" @@ -58,7 +77,8 @@ class BenchmarkWebSocketConnection : WebSocketConnection { var startWholeBatchTrace = false @Volatile - private var isShutdown = false + var isShutdown = false + private set override fun connect(): Observable { state.onNext(WebSocketConnectionState.CONNECTED) diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/SignalDatabase.kt b/app/src/main/java/org/thoughtcrime/securesms/database/SignalDatabase.kt index 3fdbd58016..cc2f654011 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/SignalDatabase.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/SignalDatabase.kt @@ -385,6 +385,22 @@ open class SignalDatabase(private val context: Application, databaseSecret: Data } } + /** + * Mirrors [runInTransaction] but instead of returning the result of calling block it returns + * whether the transaction completed successfully. + */ + @JvmStatic + fun tryRunInTransaction(block: (SignalSQLiteDatabase) -> Unit): Boolean { + var committed = false + + instance!!.signalWritableDatabase.withinTransaction { + block(it) + it.runPostSuccessfulTransaction { committed = true } + } + + return committed + } + @get:JvmStatic @get:JvmName("attachments") val attachments: AttachmentTable diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/BatchCache.kt b/app/src/main/java/org/thoughtcrime/securesms/messages/BatchCache.kt index 12046d5285..24002aef91 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/messages/BatchCache.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/BatchCache.kt @@ -132,18 +132,13 @@ class ReusedBatchCache : BatchCache() { } batchedJobs.clear() - if (threadUpdates.isNotEmpty()) { + if (threadUpdates.isNotEmpty() || mslDeletes.isNotEmpty()) { SignalDatabase.runInTransaction { threadUpdates.forEach { flushIncomingMessageInsertThreadUpdate(it) } - } - } - threadUpdates.clear() - - if (mslDeletes.isNotEmpty()) { - SignalDatabase.runInTransaction { mslDeletes.forEach { (key, timestamps) -> flushMslDelete(key.first, key.second, timestamps) } } } + threadUpdates.clear() mslDeletes.clear() } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt index 81c3043b31..dbebf6e386 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt @@ -38,9 +38,11 @@ import org.thoughtcrime.securesms.push.SignalServiceNetworkAccess.Companion.toAp import org.thoughtcrime.securesms.recipients.RecipientId import org.thoughtcrime.securesms.util.AlarmSleepTimer import org.thoughtcrime.securesms.util.AppForegroundObserver +import org.thoughtcrime.securesms.util.Environment import org.thoughtcrime.securesms.util.SignalLocalMetrics import org.thoughtcrime.securesms.util.SignalTrace import org.thoughtcrime.securesms.util.asChain +import org.whispersystems.signalservice.api.messages.EnvelopeResponse import org.whispersystems.signalservice.api.util.SleepTimer import org.whispersystems.signalservice.api.util.UptimeSleepTimer import org.whispersystems.signalservice.api.websocket.SignalWebSocket @@ -285,9 +287,7 @@ class IncomingMessageObserver( fun processEnvelope(bufferedProtocolStore: BufferedProtocolStore, envelope: Envelope, serverDeliveredTimestamp: Long, batchCache: BatchCache): List? { return when (envelope.type) { Envelope.Type.SERVER_DELIVERY_RECEIPT -> { - SignalTrace.beginSection("IncomingMessageObserver#processReceipt") processReceipt(envelope) - SignalTrace.endSection() null } @@ -397,7 +397,6 @@ class IncomingMessageObserver( private var sleepTimer: SleepTimer private val canProcessMessages: Boolean - private val batchCache = ReusedBatchCache() init { Log.i(TAG, "Initializing! (${this.hashCode()})") @@ -451,30 +450,16 @@ class IncomingMessageObserver( val hasMore = authWebSocket.readMessageBatch(websocketReadTimeout, 30) { batch -> Log.i(TAG, "Retrieved ${batch.size} envelopes!") - val bufferedStore = BufferedProtocolStore.create() val startTime = System.currentTimeMillis() GroupsV2ProcessingLock.acquireGroupProcessingLock().use { ReentrantSessionLock.INSTANCE.acquire().use { - batch.forEach { response -> - SignalTrace.beginSection("IncomingMessageObserver#perMessageTransaction") - val followUpOperations = SignalDatabase.runInTransaction { db -> - val followUps: List? = processEnvelope(bufferedStore, response.envelope, response.serverDeliveredTimestamp, batchCache) - bufferedStore.flushToDisk() - followUps - } - SignalTrace.endSection() + val batchCommitted = processBatchInTransaction(batch) - if (followUpOperations?.isNotEmpty() == true) { - Log.d(TAG, "Running ${followUpOperations.size} follow-up operations...") - val jobs = followUpOperations.mapNotNull { it.run() } - AppDependencies.jobManager.addAllChains(jobs) - } - - authWebSocket.sendAck(response) + if (!batchCommitted) { + Log.w(TAG, "Batch transaction rolled back, falling back to per-message processing") + processMessagesIndividually(batch) } - - batchCache.flushAndClear() } } val duration = System.currentTimeMillis() - startTime @@ -485,7 +470,9 @@ class IncomingMessageObserver( SignalLocalMetrics.PushWebsocketFetch.onProcessedBatch() if (!hasMore && !decryptionDrained) { - SignalTrace.endSection() + if (Environment.IS_BENCHMARK) { + SignalTrace.endSection() + } Log.i(TAG, "Decryptions newly-drained.") decryptionDrained = true @@ -528,6 +515,73 @@ class IncomingMessageObserver( Log.w(TAG, "Terminated! (${this.hashCode()})") } + /** + * Attempts to process the entire batch in a single transaction for performance. + * + * @return true if the transaction committed, false if it the batch was rolled back. + */ + private fun processBatchInTransaction(batch: List): Boolean { + val allFollowUpOperations = mutableListOf() + val bufferedStore = BufferedProtocolStore.create() + val batchCache = ReusedBatchCache() + + val committed = SignalDatabase.tryRunInTransaction { + batch.forEach { response -> + SignalTrace.beginSection("IncomingMessageObserver#perMessageTransaction") + val followUps = processEnvelope(bufferedStore, response.envelope, response.serverDeliveredTimestamp, batchCache) + bufferedStore.flushToDisk() + SignalTrace.endSection() + + if (followUps?.isNotEmpty() == true) { + allFollowUpOperations += followUps + } + } + } + + if (committed) { + batchCache.flushAndClear() + + if (allFollowUpOperations.isNotEmpty()) { + Log.d(TAG, "Running ${allFollowUpOperations.size} follow-up operations...") + val jobs = allFollowUpOperations.mapNotNull { it.run() } + AppDependencies.jobManager.addAllChains(jobs) + } + + batch.forEach { response -> + authWebSocket.sendAck(response) + } + } + + return committed + } + + /** + * If something prevented us from processing the entire batch in a single transaction, we process each message individually. + */ + private fun processMessagesIndividually(batch: List) { + val bufferedStore = BufferedProtocolStore.create() + val batchCache = ReusedBatchCache() + + batch.forEach { response -> + SignalTrace.beginSection("IncomingMessageObserver#perMessageTransaction") + val followUpOperations = SignalDatabase.runInTransaction { + val followUps = processEnvelope(bufferedStore, response.envelope, response.serverDeliveredTimestamp, batchCache) + bufferedStore.flushToDisk() + followUps + } + SignalTrace.endSection() + + if (followUpOperations?.isNotEmpty() == true) { + val jobs = followUpOperations.mapNotNull { it.run() } + AppDependencies.jobManager.addAllChains(jobs) + } + + authWebSocket.sendAck(response) + } + + batchCache.flushAndClear() + } + override fun uncaughtException(t: Thread, e: Throwable) { Log.w(TAG, "Uncaught exception in message thread!", e) } diff --git a/app/src/main/java/org/thoughtcrime/securesms/util/Environment.kt b/app/src/main/java/org/thoughtcrime/securesms/util/Environment.kt index fc1ebb0172..f0465c3845 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/util/Environment.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/util/Environment.kt @@ -12,6 +12,7 @@ object Environment { const val IS_NIGHTLY: Boolean = BuildConfig.BUILD_DISTRIBUTION_TYPE == "nightly" const val IS_WEBSITE: Boolean = BuildConfig.BUILD_DISTRIBUTION_TYPE == "website" const val IS_INSTRUMENTATION: Boolean = BuildConfig.BUILD_VARIANT_TYPE == "Instrumentation" || BuildConfig.BUILD_VARIANT_TYPE == "Benchmark" + const val IS_BENCHMARK: Boolean = BuildConfig.BUILD_VARIANT_TYPE == "Benchmark" object Backups { @JvmStatic diff --git a/benchmark/src/main/java/org/thoughtcrime/benchmark/BenchmarkMetrics.kt b/benchmark/src/main/java/org/thoughtcrime/benchmark/BenchmarkMetrics.kt index f22cc8a696..486183c200 100644 --- a/benchmark/src/main/java/org/thoughtcrime/benchmark/BenchmarkMetrics.kt +++ b/benchmark/src/main/java/org/thoughtcrime/benchmark/BenchmarkMetrics.kt @@ -20,13 +20,19 @@ object BenchmarkMetrics { TraceSectionMetric("IncomingMessageObserver#totalProcessing", Mode.Sum) ) - val dataMessageProcessor: List + val groupDataMessageProcessor: List get() = listOf( TraceSectionMetric("DataMessageProcessor#gv2PreProcessing", Mode.Average), TraceSectionMetric("DataMessageProcessor#messageInsert", Mode.Average), TraceSectionMetric("DataMessageProcessor#postProcess", Mode.Average) ) + val individualDataMessageProcessor: List + get() = listOf( + TraceSectionMetric("DataMessageProcessor#messageInsert", Mode.Average), + TraceSectionMetric("DataMessageProcessor#postProcess", Mode.Average) + ) + val messageContentProcessor: List get() = listOf( TraceSectionMetric("MessageContentProcessor#handleMessage", Mode.Average) diff --git a/benchmark/src/main/java/org/thoughtcrime/benchmark/ConversationBenchmarks.kt b/benchmark/src/main/java/org/thoughtcrime/benchmark/ConversationBenchmarks.kt index 64bd1a6d6f..5aad99be57 100644 --- a/benchmark/src/main/java/org/thoughtcrime/benchmark/ConversationBenchmarks.kt +++ b/benchmark/src/main/java/org/thoughtcrime/benchmark/ConversationBenchmarks.kt @@ -34,7 +34,7 @@ class ConversationBenchmarks { TraceSectionMetric("4-ConversationOpen-Data-Posted"), TraceSectionMetric("5-ConversationOpen-Render"), ), - iterations = 10, + iterations = 3, compilationMode = CompilationMode.Partial(), setupBlock = { if (!setup) { diff --git a/benchmark/src/main/java/org/thoughtcrime/benchmark/GroupMessageProcessingBenchmarks.kt b/benchmark/src/main/java/org/thoughtcrime/benchmark/GroupMessageProcessingBenchmarks.kt index 4f637c7196..07d7350e8b 100644 --- a/benchmark/src/main/java/org/thoughtcrime/benchmark/GroupMessageProcessingBenchmarks.kt +++ b/benchmark/src/main/java/org/thoughtcrime/benchmark/GroupMessageProcessingBenchmarks.kt @@ -41,8 +41,8 @@ class GroupMessageProcessingBenchmarks { private fun runGroupMessageReceive(withConversationOpen: Boolean) { benchmarkRule.measureRepeated( packageName = "org.thoughtcrime.securesms.benchmark", - metrics = BenchmarkMetrics.incomingMessageObserver + BenchmarkMetrics.messageContentProcessor + BenchmarkMetrics.dataMessageProcessor, - iterations = 5, + metrics = BenchmarkMetrics.incomingMessageObserver + BenchmarkMetrics.messageContentProcessor + BenchmarkMetrics.groupDataMessageProcessor, + iterations = 3, compilationMode = CompilationMode.Partial(), setupBlock = { setupGroup("group-message-send", BenchmarkSetup::setupGroupSend, withConversationOpen) @@ -69,7 +69,7 @@ class GroupMessageProcessingBenchmarks { benchmarkRule.measureRepeated( packageName = "org.thoughtcrime.securesms.benchmark", metrics = BenchmarkMetrics.incomingMessageObserver + BenchmarkMetrics.messageContentProcessor + BenchmarkMetrics.deliveryReceipt, - iterations = 5, + iterations = 3, compilationMode = CompilationMode.Partial(), setupBlock = { setupGroup("group-delivery-receipt", BenchmarkSetup::setupGroupDeliveryReceipt, withConversationOpen) @@ -95,7 +95,7 @@ class GroupMessageProcessingBenchmarks { benchmarkRule.measureRepeated( packageName = "org.thoughtcrime.securesms.benchmark", metrics = BenchmarkMetrics.incomingMessageObserver + BenchmarkMetrics.messageContentProcessor + BenchmarkMetrics.readReceipt, - iterations = 5, + iterations = 3, compilationMode = CompilationMode.Partial(), setupBlock = { setupGroup("group-read-receipt", BenchmarkSetup::setupGroupReadReceipt, withConversationOpen) diff --git a/benchmark/src/main/java/org/thoughtcrime/benchmark/MessageProcessingBenchmarks.kt b/benchmark/src/main/java/org/thoughtcrime/benchmark/MessageProcessingBenchmarks.kt index be576d7d95..9c4c3ec7fb 100644 --- a/benchmark/src/main/java/org/thoughtcrime/benchmark/MessageProcessingBenchmarks.kt +++ b/benchmark/src/main/java/org/thoughtcrime/benchmark/MessageProcessingBenchmarks.kt @@ -39,8 +39,8 @@ class MessageProcessingBenchmarks { private fun run(withConversationOpen: Boolean) { benchmarkRule.measureRepeated( packageName = "org.thoughtcrime.securesms.benchmark", - metrics = BenchmarkMetrics.incomingMessageObserver + BenchmarkMetrics.messageContentProcessor, - iterations = 5, + metrics = BenchmarkMetrics.incomingMessageObserver + BenchmarkMetrics.messageContentProcessor + BenchmarkMetrics.individualDataMessageProcessor, + iterations = 3, compilationMode = CompilationMode.Partial(), setupBlock = { BenchmarkSetup.setup("message-send", device) @@ -60,7 +60,7 @@ class MessageProcessingBenchmarks { BenchmarkSetup.releaseMessages(device) - device.wait(Until.hasObject(By.textContains("101")), 10_000L) + device.wait(Until.hasObject(By.textContains("501")), 10_000L) } } } diff --git a/benchmark/src/main/java/org/thoughtcrime/benchmark/StartupBenchmarks.kt b/benchmark/src/main/java/org/thoughtcrime/benchmark/StartupBenchmarks.kt index 31bd401b79..a37c7156a9 100644 --- a/benchmark/src/main/java/org/thoughtcrime/benchmark/StartupBenchmarks.kt +++ b/benchmark/src/main/java/org/thoughtcrime/benchmark/StartupBenchmarks.kt @@ -24,12 +24,12 @@ class StartupBenchmarks { @Test fun coldStartNone() { - measureStartup(5, CompilationMode.None()) + measureStartup(3, CompilationMode.None()) } @Test fun coldStartBaselineProfile() { - measureStartup(5, CompilationMode.Partial(BaselineProfileMode.Require)) + measureStartup(3, CompilationMode.Partial(BaselineProfileMode.Require)) } @OptIn(ExperimentalMetricApi::class)