/* * Copyright 2026 Signal Messenger, LLC * SPDX-License-Identifier: AGPL-3.0-only */ package org.signal.benchmark import android.content.BroadcastReceiver import android.content.Context import android.content.Intent import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.signal.benchmark.setup.Generator import org.signal.benchmark.setup.Harness import org.signal.benchmark.setup.OtherClient import org.signal.core.util.ThreadUtil import org.signal.core.util.logging.Log import org.thoughtcrime.securesms.database.SignalDatabase import org.thoughtcrime.securesms.database.TestDbUtils import org.thoughtcrime.securesms.dependencies.AppDependencies import org.thoughtcrime.securesms.groups.GroupId import org.thoughtcrime.securesms.recipients.Recipient import org.whispersystems.signalservice.internal.push.Envelope import org.whispersystems.signalservice.internal.websocket.BenchmarkWebSocketConnection import org.whispersystems.signalservice.internal.websocket.WebSocketRequestMessage import kotlin.random.Random /** * A BroadcastReceiver that accepts commands sent from the benchmark app to perform * background operations on the client. */ class BenchmarkCommandReceiver : BroadcastReceiver() { companion object { private val TAG = Log.tag(BenchmarkCommandReceiver::class) const val ACTION_COMMAND = "org.signal.benchmark.action.COMMAND" const val EXTRA_COMMAND = "command" } override fun onReceive(context: Context, intent: Intent) { if (intent.action != ACTION_COMMAND) { Log.w(TAG, "Ignoring unknown action: ${intent.action}") return } val command = intent.getStringExtra(EXTRA_COMMAND) Log.i(TAG, "Received command: $command") when (command) { "individual-send" -> handlePrepareIndividualSend() "group-send" -> handlePrepareGroupSend() "group-delivery-receipt" -> handlePrepareGroupReceipts { client, timestamps -> client.generateInboundDeliveryReceipts(timestamps) } "group-read-receipt" -> handlePrepareGroupReceipts { client, timestamps -> client.generateInboundReadReceipts(timestamps) } "release-messages" -> { BenchmarkWebSocketConnection.startWholeBatchTrace() BenchmarkWebSocketConnection.releaseMessages() } "delete-thread" -> { val pendingResult = goAsync() Thread { handleDeleteThread() pendingResult.finish() }.start() } else -> Log.w(TAG, "Unknown command: $command") } } private fun handlePrepareIndividualSend() { val client = Harness.otherClients[0] // Send message from Bob to Self val encryptedEnvelope = client.encrypt(Generator.encryptedTextMessage(System.currentTimeMillis())) runBlocking { launch(Dispatchers.IO) { Log.i(TAG, "Sending initial message from Bob to establish session.") BenchmarkWebSocketConnection.addPendingMessages(listOf(encryptedEnvelope.toWebSocketPayload())) BenchmarkWebSocketConnection.releaseMessages() // Sleep briefly to let the message be processed. ThreadUtil.sleep(1000) } } // Complete the session handshake so both sides have a proper double-ratchet session Log.i(TAG, "Completing session handshake with reply.") client.completeSession() // Have Bob generate N messages that will be received by Alice val messageCount = 500 val envelopes = client.generateInboundEnvelopes(messageCount) val messages = envelopes.map { e -> e.toWebSocketPayload() } BenchmarkWebSocketConnection.addPendingMessages(messages) BenchmarkWebSocketConnection.addQueueEmptyMessage() } private fun handlePrepareGroupSend() { val clients = Harness.otherClients.take(5) // Send message from others to Self in the group val encryptedEnvelopes = clients.map { it.encrypt(Generator.encryptedTextMessage(System.currentTimeMillis(), groupMasterKey = Harness.groupMasterKey)) } runBlocking { launch(Dispatchers.IO) { Log.i(TAG, "Sending initial group messages from clients to establish sessions.") BenchmarkWebSocketConnection.addPendingMessages(encryptedEnvelopes.map { it.toWebSocketPayload() }) BenchmarkWebSocketConnection.releaseMessages() // Sleep briefly to let the messages be processed. ThreadUtil.sleep(1000) } } // Complete session handshakes so both sides have proper double-ratchet sessions Log.i(TAG, "Completing session handshakes with Alice replies.") clients.forEach { it.completeSession() } // Have clients generate N group messages that will be received by Alice val allClientMessages = clients.map { client -> val messageCount = 100 val envelopes = client.generateInboundGroupEnvelopes(messageCount, Harness.groupMasterKey) envelopes.map { e -> e.toWebSocketPayload() } } BenchmarkWebSocketConnection.addPendingMessages(interleave(allClientMessages)) BenchmarkWebSocketConnection.addQueueEmptyMessage() } private fun handlePrepareGroupReceipts(generateReceipts: (OtherClient, List) -> List) { val clients = Harness.otherClients.take(5) establishGroupSessions(clients) val timestamps = getOutgoingGroupMessageTimestamps() Log.i(TAG, "Found ${timestamps.size} outgoing message timestamps for receipts") val allClientEnvelopes = clients.map { client -> generateReceipts(client, timestamps).map { it.toWebSocketPayload() } } BenchmarkWebSocketConnection.addPendingMessages(interleave(allClientEnvelopes)) BenchmarkWebSocketConnection.addQueueEmptyMessage() } private fun establishGroupSessions(clients: List) { val encryptedEnvelopes = clients.map { it.encrypt(Generator.encryptedTextMessage(System.currentTimeMillis(), groupMasterKey = Harness.groupMasterKey)) } runBlocking { launch(Dispatchers.IO) { Log.i(TAG, "Sending initial group messages from clients to establish sessions.") BenchmarkWebSocketConnection.addPendingMessages(encryptedEnvelopes.map { it.toWebSocketPayload() }) BenchmarkWebSocketConnection.releaseMessages() ThreadUtil.sleep(1000) } } Log.i(TAG, "Completing session handshakes with Alice replies.") clients.forEach { it.completeSession() } } private fun handleDeleteThread() { val threadId = SignalDatabase.rawDatabase.rawQuery("SELECT thread_id, COUNT(*) AS msg_count FROM message GROUP BY thread_id ORDER BY msg_count DESC LIMIT 1").use { cursor -> if (cursor.moveToFirst()) { val id = cursor.getLong(0) val count = cursor.getLong(1) Log.i(TAG, "Found largest thread $id with $count messages") id } else { Log.w(TAG, "No threads found for deletion benchmark") return } } val recipientName = SignalDatabase.threads.getRecipientForThreadId(threadId) ?.getDisplayName(AppDependencies.application) ?: "unknown" Log.i(TAG, "Deleting thread $threadId (recipient: $recipientName)") SignalDatabase.threads.deleteConversation(threadId, syncThreadDelete = false) Log.i(TAG, "Thread $threadId deleted (recipient: $recipientName)") } private fun getOutgoingGroupMessageTimestamps(): List { val groupId = GroupId.v2(Harness.groupMasterKey) val groupRecipient = Recipient.externalGroupExact(groupId) val threadId = SignalDatabase.threads.getOrCreateThreadIdFor(groupRecipient) val selfId = Recipient.self().id.toLong() return TestDbUtils.getOutgoingMessageTimestamps(threadId, selfId) } /** * Interleaves lists so that items from different lists alternate: * [[a1, a2], [b1, b2], [c1, c2]] -> [a1, b1, c1, a2, b2, c2] */ private fun interleave(lists: List>): List { val result = mutableListOf() val maxSize = lists.maxOf { it.size } for (i in 0 until maxSize) { for (list in lists) { if (i < list.size) { result += list[i] } } } return result } private fun Envelope.toWebSocketPayload(): WebSocketRequestMessage { return WebSocketRequestMessage( verb = "PUT", path = "/api/v1/message", id = Random.nextLong(), headers = listOf("X-Signal-Timestamp: ${this.timestamp}"), body = this.encodeByteString() ) } }