Improve message processing performance.

This commit is contained in:
Cody Henthorne
2026-02-26 10:43:00 -05:00
committed by Greyson Parrelli
parent 7eebb38eda
commit 61ba2ac97a
11 changed files with 166 additions and 81 deletions

View File

@@ -385,6 +385,22 @@ open class SignalDatabase(private val context: Application, databaseSecret: Data
}
}
/**
* Mirrors [runInTransaction] but instead of returning the result of calling block it returns
* whether the transaction completed successfully.
*/
@JvmStatic
fun tryRunInTransaction(block: (SignalSQLiteDatabase) -> Unit): Boolean {
var committed = false
instance!!.signalWritableDatabase.withinTransaction {
block(it)
it.runPostSuccessfulTransaction { committed = true }
}
return committed
}
@get:JvmStatic
@get:JvmName("attachments")
val attachments: AttachmentTable

View File

@@ -132,18 +132,13 @@ class ReusedBatchCache : BatchCache() {
}
batchedJobs.clear()
if (threadUpdates.isNotEmpty()) {
if (threadUpdates.isNotEmpty() || mslDeletes.isNotEmpty()) {
SignalDatabase.runInTransaction {
threadUpdates.forEach { flushIncomingMessageInsertThreadUpdate(it) }
}
}
threadUpdates.clear()
if (mslDeletes.isNotEmpty()) {
SignalDatabase.runInTransaction {
mslDeletes.forEach { (key, timestamps) -> flushMslDelete(key.first, key.second, timestamps) }
}
}
threadUpdates.clear()
mslDeletes.clear()
}
}

View File

@@ -38,9 +38,11 @@ import org.thoughtcrime.securesms.push.SignalServiceNetworkAccess.Companion.toAp
import org.thoughtcrime.securesms.recipients.RecipientId
import org.thoughtcrime.securesms.util.AlarmSleepTimer
import org.thoughtcrime.securesms.util.AppForegroundObserver
import org.thoughtcrime.securesms.util.Environment
import org.thoughtcrime.securesms.util.SignalLocalMetrics
import org.thoughtcrime.securesms.util.SignalTrace
import org.thoughtcrime.securesms.util.asChain
import org.whispersystems.signalservice.api.messages.EnvelopeResponse
import org.whispersystems.signalservice.api.util.SleepTimer
import org.whispersystems.signalservice.api.util.UptimeSleepTimer
import org.whispersystems.signalservice.api.websocket.SignalWebSocket
@@ -285,9 +287,7 @@ class IncomingMessageObserver(
fun processEnvelope(bufferedProtocolStore: BufferedProtocolStore, envelope: Envelope, serverDeliveredTimestamp: Long, batchCache: BatchCache): List<FollowUpOperation>? {
return when (envelope.type) {
Envelope.Type.SERVER_DELIVERY_RECEIPT -> {
SignalTrace.beginSection("IncomingMessageObserver#processReceipt")
processReceipt(envelope)
SignalTrace.endSection()
null
}
@@ -397,7 +397,6 @@ class IncomingMessageObserver(
private var sleepTimer: SleepTimer
private val canProcessMessages: Boolean
private val batchCache = ReusedBatchCache()
init {
Log.i(TAG, "Initializing! (${this.hashCode()})")
@@ -451,30 +450,16 @@ class IncomingMessageObserver(
val hasMore = authWebSocket.readMessageBatch(websocketReadTimeout, 30) { batch ->
Log.i(TAG, "Retrieved ${batch.size} envelopes!")
val bufferedStore = BufferedProtocolStore.create()
val startTime = System.currentTimeMillis()
GroupsV2ProcessingLock.acquireGroupProcessingLock().use {
ReentrantSessionLock.INSTANCE.acquire().use {
batch.forEach { response ->
SignalTrace.beginSection("IncomingMessageObserver#perMessageTransaction")
val followUpOperations = SignalDatabase.runInTransaction { db ->
val followUps: List<FollowUpOperation>? = processEnvelope(bufferedStore, response.envelope, response.serverDeliveredTimestamp, batchCache)
bufferedStore.flushToDisk()
followUps
}
SignalTrace.endSection()
val batchCommitted = processBatchInTransaction(batch)
if (followUpOperations?.isNotEmpty() == true) {
Log.d(TAG, "Running ${followUpOperations.size} follow-up operations...")
val jobs = followUpOperations.mapNotNull { it.run() }
AppDependencies.jobManager.addAllChains(jobs)
}
authWebSocket.sendAck(response)
if (!batchCommitted) {
Log.w(TAG, "Batch transaction rolled back, falling back to per-message processing")
processMessagesIndividually(batch)
}
batchCache.flushAndClear()
}
}
val duration = System.currentTimeMillis() - startTime
@@ -485,7 +470,9 @@ class IncomingMessageObserver(
SignalLocalMetrics.PushWebsocketFetch.onProcessedBatch()
if (!hasMore && !decryptionDrained) {
SignalTrace.endSection()
if (Environment.IS_BENCHMARK) {
SignalTrace.endSection()
}
Log.i(TAG, "Decryptions newly-drained.")
decryptionDrained = true
@@ -528,6 +515,73 @@ class IncomingMessageObserver(
Log.w(TAG, "Terminated! (${this.hashCode()})")
}
/**
* Attempts to process the entire batch in a single transaction for performance.
*
* @return true if the transaction committed, false if it the batch was rolled back.
*/
private fun processBatchInTransaction(batch: List<EnvelopeResponse>): Boolean {
val allFollowUpOperations = mutableListOf<FollowUpOperation>()
val bufferedStore = BufferedProtocolStore.create()
val batchCache = ReusedBatchCache()
val committed = SignalDatabase.tryRunInTransaction {
batch.forEach { response ->
SignalTrace.beginSection("IncomingMessageObserver#perMessageTransaction")
val followUps = processEnvelope(bufferedStore, response.envelope, response.serverDeliveredTimestamp, batchCache)
bufferedStore.flushToDisk()
SignalTrace.endSection()
if (followUps?.isNotEmpty() == true) {
allFollowUpOperations += followUps
}
}
}
if (committed) {
batchCache.flushAndClear()
if (allFollowUpOperations.isNotEmpty()) {
Log.d(TAG, "Running ${allFollowUpOperations.size} follow-up operations...")
val jobs = allFollowUpOperations.mapNotNull { it.run() }
AppDependencies.jobManager.addAllChains(jobs)
}
batch.forEach { response ->
authWebSocket.sendAck(response)
}
}
return committed
}
/**
* If something prevented us from processing the entire batch in a single transaction, we process each message individually.
*/
private fun processMessagesIndividually(batch: List<EnvelopeResponse>) {
val bufferedStore = BufferedProtocolStore.create()
val batchCache = ReusedBatchCache()
batch.forEach { response ->
SignalTrace.beginSection("IncomingMessageObserver#perMessageTransaction")
val followUpOperations = SignalDatabase.runInTransaction {
val followUps = processEnvelope(bufferedStore, response.envelope, response.serverDeliveredTimestamp, batchCache)
bufferedStore.flushToDisk()
followUps
}
SignalTrace.endSection()
if (followUpOperations?.isNotEmpty() == true) {
val jobs = followUpOperations.mapNotNull { it.run() }
AppDependencies.jobManager.addAllChains(jobs)
}
authWebSocket.sendAck(response)
}
batchCache.flushAndClear()
}
override fun uncaughtException(t: Thread, e: Throwable) {
Log.w(TAG, "Uncaught exception in message thread!", e)
}

View File

@@ -12,6 +12,7 @@ object Environment {
const val IS_NIGHTLY: Boolean = BuildConfig.BUILD_DISTRIBUTION_TYPE == "nightly"
const val IS_WEBSITE: Boolean = BuildConfig.BUILD_DISTRIBUTION_TYPE == "website"
const val IS_INSTRUMENTATION: Boolean = BuildConfig.BUILD_VARIANT_TYPE == "Instrumentation" || BuildConfig.BUILD_VARIANT_TYPE == "Benchmark"
const val IS_BENCHMARK: Boolean = BuildConfig.BUILD_VARIANT_TYPE == "Benchmark"
object Backups {
@JvmStatic