Allow SendDeliveryReceiptJob to run during benchmarking tests.

This commit is contained in:
Cody Henthorne
2026-02-13 17:01:32 -05:00
committed by Alex Hart
parent 47947b85c7
commit 58b5ebf39d
3 changed files with 165 additions and 30 deletions

View File

@@ -6,13 +6,52 @@
package org.thoughtcrime.securesms
import android.app.Application
import org.signal.benchmark.network.BenchmarkWebSocketConnection
import org.signal.libsignal.net.Network
import org.thoughtcrime.securesms.database.JobDatabase
import org.thoughtcrime.securesms.dependencies.AppDependencies
import org.thoughtcrime.securesms.dependencies.ApplicationDependencyProvider
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobmanager.JobManager
import org.thoughtcrime.securesms.jobmanager.JobMigrator
import org.thoughtcrime.securesms.jobmanager.impl.FactoryJobPredicate
import org.thoughtcrime.securesms.jobs.AccountConsistencyWorkerJob
import org.thoughtcrime.securesms.jobs.ArchiveBackupIdReservationJob
import org.thoughtcrime.securesms.jobs.AttachmentCompressionJob
import org.thoughtcrime.securesms.jobs.AttachmentUploadJob
import org.thoughtcrime.securesms.jobs.CreateReleaseChannelJob
import org.thoughtcrime.securesms.jobs.DirectoryRefreshJob
import org.thoughtcrime.securesms.jobs.DownloadLatestEmojiDataJob
import org.thoughtcrime.securesms.jobs.EmojiSearchIndexDownloadJob
import org.thoughtcrime.securesms.jobs.FastJobStorage
import org.thoughtcrime.securesms.jobs.FontDownloaderJob
import org.thoughtcrime.securesms.jobs.GroupCallUpdateSendJob
import org.thoughtcrime.securesms.jobs.GroupRingCleanupJob
import org.thoughtcrime.securesms.jobs.GroupV2UpdateSelfProfileKeyJob
import org.thoughtcrime.securesms.jobs.IndividualSendJob
import org.thoughtcrime.securesms.jobs.JobManagerFactories
import org.thoughtcrime.securesms.jobs.LinkedDeviceInactiveCheckJob
import org.thoughtcrime.securesms.jobs.MarkerJob
import org.thoughtcrime.securesms.jobs.MultiDeviceProfileKeyUpdateJob
import org.thoughtcrime.securesms.jobs.PostRegistrationBackupRedemptionJob
import org.thoughtcrime.securesms.jobs.PreKeysSyncJob
import org.thoughtcrime.securesms.jobs.ProfileUploadJob
import org.thoughtcrime.securesms.jobs.PushGroupSendJob
import org.thoughtcrime.securesms.jobs.PushProcessMessageJob
import org.thoughtcrime.securesms.jobs.ReactionSendJob
import org.thoughtcrime.securesms.jobs.RefreshAttributesJob
import org.thoughtcrime.securesms.jobs.RetrieveRemoteAnnouncementsJob
import org.thoughtcrime.securesms.jobs.RotateCertificateJob
import org.thoughtcrime.securesms.jobs.SendDeliveryReceiptJob
import org.thoughtcrime.securesms.jobs.StickerPackDownloadJob
import org.thoughtcrime.securesms.jobs.StorageSyncJob
import org.thoughtcrime.securesms.jobs.StoryOnboardingDownloadJob
import org.thoughtcrime.securesms.jobs.TypingSendJob
import org.thoughtcrime.securesms.net.DeviceTransferBlockingInterceptor
import org.thoughtcrime.securesms.util.TextSecurePreferences
import org.whispersystems.signalservice.api.util.UptimeSleepTimer
import org.whispersystems.signalservice.api.websocket.SignalWebSocket
import org.whispersystems.signalservice.internal.configuration.SignalServiceConfiguration
import org.whispersystems.signalservice.internal.websocket.BenchmarkWebSocketConnection
import java.util.function.Supplier
import kotlin.time.Duration.Companion.seconds
@@ -20,12 +59,8 @@ class BenchmarkApplicationContext : ApplicationContext() {
override fun initializeAppDependencies() {
AppDependencies.init(this, BenchmarkDependencyProvider(this, ApplicationDependencyProvider(this)))
}
override fun beginJobLoop() = Unit
fun beginJobLoopForTests() {
super.beginJobLoop()
DeviceTransferBlockingInterceptor.getInstance().blockNetwork()
}
override fun onForeground() = Unit
@@ -36,11 +71,97 @@ class BenchmarkApplicationContext : ApplicationContext() {
libSignalNetworkSupplier: Supplier<Network>
): SignalWebSocket.AuthenticatedWebSocket {
return SignalWebSocket.AuthenticatedWebSocket(
connectionFactory = { BenchmarkWebSocketConnection.create() },
connectionFactory = { BenchmarkWebSocketConnection.createAuthInstance() },
canConnect = { true },
sleepTimer = UptimeSleepTimer(),
disconnectTimeoutMs = 15.seconds.inWholeMilliseconds
)
}
override fun provideUnauthWebSocket(
signalServiceConfigurationSupplier: Supplier<SignalServiceConfiguration>,
libSignalNetworkSupplier: Supplier<Network>
): SignalWebSocket.UnauthenticatedWebSocket {
return SignalWebSocket.UnauthenticatedWebSocket(
connectionFactory = { BenchmarkWebSocketConnection.createUnauthInstance() },
canConnect = { true },
sleepTimer = UptimeSleepTimer(),
disconnectTimeoutMs = 15.seconds.inWholeMilliseconds
)
}
override fun provideJobManager(): JobManager {
val config = JobManager.Configuration.Builder()
.setJobFactories(filterJobFactories(JobManagerFactories.getJobFactories(application)))
.setConstraintFactories(JobManagerFactories.getConstraintFactories(application))
.setConstraintObservers(JobManagerFactories.getConstraintObservers(application))
.setJobStorage(FastJobStorage(JobDatabase.getInstance(application)))
.setJobMigrator(JobMigrator(TextSecurePreferences.getJobManagerVersion(application), JobManager.CURRENT_VERSION, JobManagerFactories.getJobMigrations(application)))
.addReservedJobRunner(FactoryJobPredicate(PushProcessMessageJob.KEY, MarkerJob.KEY))
.addReservedJobRunner(FactoryJobPredicate(AttachmentUploadJob.KEY, AttachmentCompressionJob.KEY))
.addReservedJobRunner(
FactoryJobPredicate(
IndividualSendJob.KEY,
PushGroupSendJob.KEY,
ReactionSendJob.KEY,
TypingSendJob.KEY,
GroupCallUpdateSendJob.KEY,
SendDeliveryReceiptJob.KEY
)
)
.build()
return JobManager(application, config)
}
private fun filterJobFactories(jobFactories: Map<String, Job.Factory<*>>): Map<String, Job.Factory<*>> {
val blockedJobs = setOf(
AccountConsistencyWorkerJob.KEY,
ArchiveBackupIdReservationJob.KEY,
CreateReleaseChannelJob.KEY,
DirectoryRefreshJob.KEY,
DownloadLatestEmojiDataJob.KEY,
EmojiSearchIndexDownloadJob.KEY,
FontDownloaderJob.KEY,
GroupRingCleanupJob.KEY,
GroupV2UpdateSelfProfileKeyJob.KEY,
LinkedDeviceInactiveCheckJob.KEY,
MultiDeviceProfileKeyUpdateJob.KEY,
PostRegistrationBackupRedemptionJob.KEY,
PreKeysSyncJob.KEY,
ProfileUploadJob.KEY,
RefreshAttributesJob.KEY,
RetrieveRemoteAnnouncementsJob.KEY,
RotateCertificateJob.KEY,
StickerPackDownloadJob.KEY,
StorageSyncJob.KEY,
StoryOnboardingDownloadJob.KEY
)
return jobFactories.mapValues {
if (it.key in blockedJobs) {
NoOpJob.Factory()
} else {
it.value
}
}
}
}
private class NoOpJob(parameters: Parameters) : Job(parameters) {
companion object {
const val KEY = "NoOpJob"
}
override fun serialize(): ByteArray? = null
override fun getFactoryKey(): String = KEY
override fun run(): Result = Result.success()
override fun onFailure() = Unit
class Factory : Job.Factory<NoOpJob> {
override fun create(parameters: Parameters, serializedData: ByteArray?): NoOpJob {
return NoOpJob(parameters)
}
}
}
}

View File

@@ -11,12 +11,12 @@ import android.content.Intent
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.signal.benchmark.network.BenchmarkWebSocketConnection
import org.signal.benchmark.setup.Generator
import org.signal.benchmark.setup.Harness
import org.signal.core.util.ThreadUtil
import org.signal.core.util.logging.Log
import org.whispersystems.signalservice.internal.push.Envelope
import org.whispersystems.signalservice.internal.websocket.BenchmarkWebSocketConnection
import org.whispersystems.signalservice.internal.websocket.WebSocketRequestMessage
import kotlin.random.Random
@@ -46,8 +46,8 @@ class BenchmarkCommandReceiver : BroadcastReceiver() {
"individual-send" -> handlePrepareIndividualSend()
"group-send" -> handlePrepareGroupSend()
"release-messages" -> {
BenchmarkWebSocketConnection.instance.startWholeBatchTrace = true
BenchmarkWebSocketConnection.instance.releaseMessages()
BenchmarkWebSocketConnection.authInstance.startWholeBatchTrace = true
BenchmarkWebSocketConnection.authInstance.releaseMessages()
}
else -> Log.w(TAG, "Unknown command: $command")
}
@@ -61,7 +61,7 @@ class BenchmarkCommandReceiver : BroadcastReceiver() {
runBlocking {
launch(Dispatchers.IO) {
BenchmarkWebSocketConnection.instance.run {
BenchmarkWebSocketConnection.authInstance.run {
Log.i(TAG, "Sending initial message form Bob to establish session.")
addPendingMessages(listOf(encryptedEnvelope.toWebSocketPayload()))
releaseMessages()
@@ -78,8 +78,8 @@ class BenchmarkCommandReceiver : BroadcastReceiver() {
val messages = envelopes.map { e -> e.toWebSocketPayload() }
BenchmarkWebSocketConnection.instance.addPendingMessages(messages)
BenchmarkWebSocketConnection.instance.addQueueEmptyMessage()
BenchmarkWebSocketConnection.authInstance.addPendingMessages(messages)
BenchmarkWebSocketConnection.authInstance.addQueueEmptyMessage()
}
private fun handlePrepareGroupSend() {
@@ -90,7 +90,7 @@ class BenchmarkCommandReceiver : BroadcastReceiver() {
runBlocking {
launch(Dispatchers.IO) {
BenchmarkWebSocketConnection.instance.run {
BenchmarkWebSocketConnection.authInstance.run {
Log.i(TAG, "Sending initial group messages from client to establish sessions.")
addPendingMessages(encryptedEnvelopes.map { it.toWebSocketPayload() })
releaseMessages()
@@ -108,9 +108,9 @@ class BenchmarkCommandReceiver : BroadcastReceiver() {
val messages = envelopes.map { e -> e.toWebSocketPayload() }
BenchmarkWebSocketConnection.instance.addPendingMessages(messages)
BenchmarkWebSocketConnection.authInstance.addPendingMessages(messages)
}
BenchmarkWebSocketConnection.instance.addQueueEmptyMessage()
BenchmarkWebSocketConnection.authInstance.addQueueEmptyMessage()
}
private fun Envelope.toWebSocketPayload(): WebSocketRequestMessage {

View File

@@ -3,18 +3,15 @@
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.signal.benchmark.network
package org.whispersystems.signalservice.internal.websocket
import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.core.Single
import io.reactivex.rxjava3.subjects.BehaviorSubject
import okio.IOException
import org.thoughtcrime.securesms.util.JsonUtils
import org.thoughtcrime.securesms.util.SignalTrace
import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState
import org.whispersystems.signalservice.internal.websocket.WebSocketConnection
import org.whispersystems.signalservice.internal.websocket.WebSocketRequestMessage
import org.whispersystems.signalservice.internal.websocket.WebSocketResponseMessage
import org.whispersystems.signalservice.internal.websocket.WebsocketResponse
import org.whispersystems.signalservice.internal.push.SendMessageResponse
import java.net.SocketException
import java.util.LinkedList
import java.util.Optional
@@ -32,13 +29,22 @@ import java.util.concurrent.TimeoutException
class BenchmarkWebSocketConnection : WebSocketConnection {
companion object {
lateinit var instance: BenchmarkWebSocketConnection
lateinit var authInstance: BenchmarkWebSocketConnection
private set
@Synchronized
fun create(): WebSocketConnection {
instance = BenchmarkWebSocketConnection()
return instance
fun createAuthInstance(): WebSocketConnection {
authInstance = BenchmarkWebSocketConnection()
return authInstance
}
lateinit var unauthInstance: BenchmarkWebSocketConnection
private set
@Synchronized
fun createUnauthInstance(): WebSocketConnection {
unauthInstance = BenchmarkWebSocketConnection()
return unauthInstance
}
}
@@ -118,12 +124,16 @@ class BenchmarkWebSocketConnection : WebSocketConnection {
request: WebSocketRequestMessage,
timeoutSeconds: Long
): Single<WebsocketResponse> {
return Single.error(IOException("fake timeout"))
if (request.verb != null && request.path != null) {
if (request.verb == "PUT" && request.path!!.startsWith("/v1/messages/")) {
return Single.just(WebsocketResponse(200, SendMessageResponse().toJson(), emptyList<String>(), true))
}
}
return Single.error(okio.IOException("fake timeout"))
}
override fun sendKeepAlive() {
error("Not yet implemented")
}
override fun sendKeepAlive() = Unit
fun addQueueEmptyMessage() {
addPendingMessages(
@@ -136,3 +146,7 @@ class BenchmarkWebSocketConnection : WebSocketConnection {
)
}
}
private fun Any.toJson(): String {
return JsonUtils.toJson(this)
}