diff --git a/app/src/benchmark/java/org/thoughtcrime/securesms/BenchmarkApplicationContext.kt b/app/src/benchmark/java/org/thoughtcrime/securesms/BenchmarkApplicationContext.kt index 953031c862..fe7eb85ce9 100644 --- a/app/src/benchmark/java/org/thoughtcrime/securesms/BenchmarkApplicationContext.kt +++ b/app/src/benchmark/java/org/thoughtcrime/securesms/BenchmarkApplicationContext.kt @@ -6,13 +6,52 @@ package org.thoughtcrime.securesms import android.app.Application -import org.signal.benchmark.network.BenchmarkWebSocketConnection import org.signal.libsignal.net.Network +import org.thoughtcrime.securesms.database.JobDatabase import org.thoughtcrime.securesms.dependencies.AppDependencies import org.thoughtcrime.securesms.dependencies.ApplicationDependencyProvider +import org.thoughtcrime.securesms.jobmanager.Job +import org.thoughtcrime.securesms.jobmanager.JobManager +import org.thoughtcrime.securesms.jobmanager.JobMigrator +import org.thoughtcrime.securesms.jobmanager.impl.FactoryJobPredicate +import org.thoughtcrime.securesms.jobs.AccountConsistencyWorkerJob +import org.thoughtcrime.securesms.jobs.ArchiveBackupIdReservationJob +import org.thoughtcrime.securesms.jobs.AttachmentCompressionJob +import org.thoughtcrime.securesms.jobs.AttachmentUploadJob +import org.thoughtcrime.securesms.jobs.CreateReleaseChannelJob +import org.thoughtcrime.securesms.jobs.DirectoryRefreshJob +import org.thoughtcrime.securesms.jobs.DownloadLatestEmojiDataJob +import org.thoughtcrime.securesms.jobs.EmojiSearchIndexDownloadJob +import org.thoughtcrime.securesms.jobs.FastJobStorage +import org.thoughtcrime.securesms.jobs.FontDownloaderJob +import org.thoughtcrime.securesms.jobs.GroupCallUpdateSendJob +import org.thoughtcrime.securesms.jobs.GroupRingCleanupJob +import org.thoughtcrime.securesms.jobs.GroupV2UpdateSelfProfileKeyJob +import org.thoughtcrime.securesms.jobs.IndividualSendJob +import org.thoughtcrime.securesms.jobs.JobManagerFactories +import org.thoughtcrime.securesms.jobs.LinkedDeviceInactiveCheckJob +import org.thoughtcrime.securesms.jobs.MarkerJob +import org.thoughtcrime.securesms.jobs.MultiDeviceProfileKeyUpdateJob +import org.thoughtcrime.securesms.jobs.PostRegistrationBackupRedemptionJob +import org.thoughtcrime.securesms.jobs.PreKeysSyncJob +import org.thoughtcrime.securesms.jobs.ProfileUploadJob +import org.thoughtcrime.securesms.jobs.PushGroupSendJob +import org.thoughtcrime.securesms.jobs.PushProcessMessageJob +import org.thoughtcrime.securesms.jobs.ReactionSendJob +import org.thoughtcrime.securesms.jobs.RefreshAttributesJob +import org.thoughtcrime.securesms.jobs.RetrieveRemoteAnnouncementsJob +import org.thoughtcrime.securesms.jobs.RotateCertificateJob +import org.thoughtcrime.securesms.jobs.SendDeliveryReceiptJob +import org.thoughtcrime.securesms.jobs.StickerPackDownloadJob +import org.thoughtcrime.securesms.jobs.StorageSyncJob +import org.thoughtcrime.securesms.jobs.StoryOnboardingDownloadJob +import org.thoughtcrime.securesms.jobs.TypingSendJob +import org.thoughtcrime.securesms.net.DeviceTransferBlockingInterceptor +import org.thoughtcrime.securesms.util.TextSecurePreferences import org.whispersystems.signalservice.api.util.UptimeSleepTimer import org.whispersystems.signalservice.api.websocket.SignalWebSocket import org.whispersystems.signalservice.internal.configuration.SignalServiceConfiguration +import org.whispersystems.signalservice.internal.websocket.BenchmarkWebSocketConnection import java.util.function.Supplier import kotlin.time.Duration.Companion.seconds @@ -20,12 +59,8 @@ class BenchmarkApplicationContext : ApplicationContext() { override fun initializeAppDependencies() { AppDependencies.init(this, BenchmarkDependencyProvider(this, ApplicationDependencyProvider(this))) - } - override fun beginJobLoop() = Unit - - fun beginJobLoopForTests() { - super.beginJobLoop() + DeviceTransferBlockingInterceptor.getInstance().blockNetwork() } override fun onForeground() = Unit @@ -36,11 +71,97 @@ class BenchmarkApplicationContext : ApplicationContext() { libSignalNetworkSupplier: Supplier ): SignalWebSocket.AuthenticatedWebSocket { return SignalWebSocket.AuthenticatedWebSocket( - connectionFactory = { BenchmarkWebSocketConnection.create() }, + connectionFactory = { BenchmarkWebSocketConnection.createAuthInstance() }, canConnect = { true }, sleepTimer = UptimeSleepTimer(), disconnectTimeoutMs = 15.seconds.inWholeMilliseconds ) } + + override fun provideUnauthWebSocket( + signalServiceConfigurationSupplier: Supplier, + libSignalNetworkSupplier: Supplier + ): SignalWebSocket.UnauthenticatedWebSocket { + return SignalWebSocket.UnauthenticatedWebSocket( + connectionFactory = { BenchmarkWebSocketConnection.createUnauthInstance() }, + canConnect = { true }, + sleepTimer = UptimeSleepTimer(), + disconnectTimeoutMs = 15.seconds.inWholeMilliseconds + ) + } + + override fun provideJobManager(): JobManager { + val config = JobManager.Configuration.Builder() + .setJobFactories(filterJobFactories(JobManagerFactories.getJobFactories(application))) + .setConstraintFactories(JobManagerFactories.getConstraintFactories(application)) + .setConstraintObservers(JobManagerFactories.getConstraintObservers(application)) + .setJobStorage(FastJobStorage(JobDatabase.getInstance(application))) + .setJobMigrator(JobMigrator(TextSecurePreferences.getJobManagerVersion(application), JobManager.CURRENT_VERSION, JobManagerFactories.getJobMigrations(application))) + .addReservedJobRunner(FactoryJobPredicate(PushProcessMessageJob.KEY, MarkerJob.KEY)) + .addReservedJobRunner(FactoryJobPredicate(AttachmentUploadJob.KEY, AttachmentCompressionJob.KEY)) + .addReservedJobRunner( + FactoryJobPredicate( + IndividualSendJob.KEY, + PushGroupSendJob.KEY, + ReactionSendJob.KEY, + TypingSendJob.KEY, + GroupCallUpdateSendJob.KEY, + SendDeliveryReceiptJob.KEY + ) + ) + .build() + return JobManager(application, config) + } + + private fun filterJobFactories(jobFactories: Map>): Map> { + val blockedJobs = setOf( + AccountConsistencyWorkerJob.KEY, + ArchiveBackupIdReservationJob.KEY, + CreateReleaseChannelJob.KEY, + DirectoryRefreshJob.KEY, + DownloadLatestEmojiDataJob.KEY, + EmojiSearchIndexDownloadJob.KEY, + FontDownloaderJob.KEY, + GroupRingCleanupJob.KEY, + GroupV2UpdateSelfProfileKeyJob.KEY, + LinkedDeviceInactiveCheckJob.KEY, + MultiDeviceProfileKeyUpdateJob.KEY, + PostRegistrationBackupRedemptionJob.KEY, + PreKeysSyncJob.KEY, + ProfileUploadJob.KEY, + RefreshAttributesJob.KEY, + RetrieveRemoteAnnouncementsJob.KEY, + RotateCertificateJob.KEY, + StickerPackDownloadJob.KEY, + StorageSyncJob.KEY, + StoryOnboardingDownloadJob.KEY + ) + + return jobFactories.mapValues { + if (it.key in blockedJobs) { + NoOpJob.Factory() + } else { + it.value + } + } + } + } + + private class NoOpJob(parameters: Parameters) : Job(parameters) { + + companion object { + const val KEY = "NoOpJob" + } + + override fun serialize(): ByteArray? = null + override fun getFactoryKey(): String = KEY + override fun run(): Result = Result.success() + override fun onFailure() = Unit + + class Factory : Job.Factory { + override fun create(parameters: Parameters, serializedData: ByteArray?): NoOpJob { + return NoOpJob(parameters) + } + } } } diff --git a/app/src/benchmarkShared/java/org/signal/benchmark/BenchmarkCommandReceiver.kt b/app/src/benchmarkShared/java/org/signal/benchmark/BenchmarkCommandReceiver.kt index c1288d065e..92b15e0622 100644 --- a/app/src/benchmarkShared/java/org/signal/benchmark/BenchmarkCommandReceiver.kt +++ b/app/src/benchmarkShared/java/org/signal/benchmark/BenchmarkCommandReceiver.kt @@ -11,12 +11,12 @@ import android.content.Intent import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking -import org.signal.benchmark.network.BenchmarkWebSocketConnection import org.signal.benchmark.setup.Generator import org.signal.benchmark.setup.Harness import org.signal.core.util.ThreadUtil import org.signal.core.util.logging.Log import org.whispersystems.signalservice.internal.push.Envelope +import org.whispersystems.signalservice.internal.websocket.BenchmarkWebSocketConnection import org.whispersystems.signalservice.internal.websocket.WebSocketRequestMessage import kotlin.random.Random @@ -46,8 +46,8 @@ class BenchmarkCommandReceiver : BroadcastReceiver() { "individual-send" -> handlePrepareIndividualSend() "group-send" -> handlePrepareGroupSend() "release-messages" -> { - BenchmarkWebSocketConnection.instance.startWholeBatchTrace = true - BenchmarkWebSocketConnection.instance.releaseMessages() + BenchmarkWebSocketConnection.authInstance.startWholeBatchTrace = true + BenchmarkWebSocketConnection.authInstance.releaseMessages() } else -> Log.w(TAG, "Unknown command: $command") } @@ -61,7 +61,7 @@ class BenchmarkCommandReceiver : BroadcastReceiver() { runBlocking { launch(Dispatchers.IO) { - BenchmarkWebSocketConnection.instance.run { + BenchmarkWebSocketConnection.authInstance.run { Log.i(TAG, "Sending initial message form Bob to establish session.") addPendingMessages(listOf(encryptedEnvelope.toWebSocketPayload())) releaseMessages() @@ -78,8 +78,8 @@ class BenchmarkCommandReceiver : BroadcastReceiver() { val messages = envelopes.map { e -> e.toWebSocketPayload() } - BenchmarkWebSocketConnection.instance.addPendingMessages(messages) - BenchmarkWebSocketConnection.instance.addQueueEmptyMessage() + BenchmarkWebSocketConnection.authInstance.addPendingMessages(messages) + BenchmarkWebSocketConnection.authInstance.addQueueEmptyMessage() } private fun handlePrepareGroupSend() { @@ -90,7 +90,7 @@ class BenchmarkCommandReceiver : BroadcastReceiver() { runBlocking { launch(Dispatchers.IO) { - BenchmarkWebSocketConnection.instance.run { + BenchmarkWebSocketConnection.authInstance.run { Log.i(TAG, "Sending initial group messages from client to establish sessions.") addPendingMessages(encryptedEnvelopes.map { it.toWebSocketPayload() }) releaseMessages() @@ -108,9 +108,9 @@ class BenchmarkCommandReceiver : BroadcastReceiver() { val messages = envelopes.map { e -> e.toWebSocketPayload() } - BenchmarkWebSocketConnection.instance.addPendingMessages(messages) + BenchmarkWebSocketConnection.authInstance.addPendingMessages(messages) } - BenchmarkWebSocketConnection.instance.addQueueEmptyMessage() + BenchmarkWebSocketConnection.authInstance.addQueueEmptyMessage() } private fun Envelope.toWebSocketPayload(): WebSocketRequestMessage { diff --git a/app/src/benchmarkShared/java/org/signal/benchmark/network/BenchmarkWebSocketConnection.kt b/app/src/benchmarkShared/java/org/whispersystems/signalservice/internal/websocket/BenchmarkWebSocketConnection.kt similarity index 75% rename from app/src/benchmarkShared/java/org/signal/benchmark/network/BenchmarkWebSocketConnection.kt rename to app/src/benchmarkShared/java/org/whispersystems/signalservice/internal/websocket/BenchmarkWebSocketConnection.kt index a812201e0b..9dd54f4c3a 100644 --- a/app/src/benchmarkShared/java/org/signal/benchmark/network/BenchmarkWebSocketConnection.kt +++ b/app/src/benchmarkShared/java/org/whispersystems/signalservice/internal/websocket/BenchmarkWebSocketConnection.kt @@ -3,18 +3,15 @@ * SPDX-License-Identifier: AGPL-3.0-only */ -package org.signal.benchmark.network +package org.whispersystems.signalservice.internal.websocket import io.reactivex.rxjava3.core.Observable import io.reactivex.rxjava3.core.Single import io.reactivex.rxjava3.subjects.BehaviorSubject -import okio.IOException +import org.thoughtcrime.securesms.util.JsonUtils import org.thoughtcrime.securesms.util.SignalTrace import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState -import org.whispersystems.signalservice.internal.websocket.WebSocketConnection -import org.whispersystems.signalservice.internal.websocket.WebSocketRequestMessage -import org.whispersystems.signalservice.internal.websocket.WebSocketResponseMessage -import org.whispersystems.signalservice.internal.websocket.WebsocketResponse +import org.whispersystems.signalservice.internal.push.SendMessageResponse import java.net.SocketException import java.util.LinkedList import java.util.Optional @@ -32,13 +29,22 @@ import java.util.concurrent.TimeoutException class BenchmarkWebSocketConnection : WebSocketConnection { companion object { - lateinit var instance: BenchmarkWebSocketConnection + lateinit var authInstance: BenchmarkWebSocketConnection private set @Synchronized - fun create(): WebSocketConnection { - instance = BenchmarkWebSocketConnection() - return instance + fun createAuthInstance(): WebSocketConnection { + authInstance = BenchmarkWebSocketConnection() + return authInstance + } + + lateinit var unauthInstance: BenchmarkWebSocketConnection + private set + + @Synchronized + fun createUnauthInstance(): WebSocketConnection { + unauthInstance = BenchmarkWebSocketConnection() + return unauthInstance } } @@ -118,12 +124,16 @@ class BenchmarkWebSocketConnection : WebSocketConnection { request: WebSocketRequestMessage, timeoutSeconds: Long ): Single { - return Single.error(IOException("fake timeout")) + if (request.verb != null && request.path != null) { + if (request.verb == "PUT" && request.path!!.startsWith("/v1/messages/")) { + return Single.just(WebsocketResponse(200, SendMessageResponse().toJson(), emptyList(), true)) + } + } + + return Single.error(okio.IOException("fake timeout")) } - override fun sendKeepAlive() { - error("Not yet implemented") - } + override fun sendKeepAlive() = Unit fun addQueueEmptyMessage() { addPendingMessages( @@ -136,3 +146,7 @@ class BenchmarkWebSocketConnection : WebSocketConnection { ) } } + +private fun Any.toJson(): String { + return JsonUtils.toJson(this) +}