From d35ec54c5cd0680006f31e3a764dc0a26de8450a Mon Sep 17 00:00:00 2001 From: Greyson Parrelli Date: Wed, 29 Jan 2025 14:49:37 -0500 Subject: [PATCH] Various backup performance improvements. --- .../securesms/backup/v2/BackupRepository.kt | 4 + .../v2/exporters/ChatItemArchiveExporter.kt | 112 ++++++++++----- .../v2/processor/RecipientArchiveProcessor.kt | 11 ++ .../signal/core/util/ParallelEventTimer.kt | 129 ++++++++++++++++++ 4 files changed, 221 insertions(+), 35 deletions(-) create mode 100644 core-util/src/main/java/org/signal/core/util/ParallelEventTimer.kt diff --git a/app/src/main/java/org/thoughtcrime/securesms/backup/v2/BackupRepository.kt b/app/src/main/java/org/thoughtcrime/securesms/backup/v2/BackupRepository.kt index 979860b96b..4e75e4c7e5 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/backup/v2/BackupRepository.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/backup/v2/BackupRepository.kt @@ -12,6 +12,7 @@ import androidx.annotation.Discouraged import androidx.annotation.WorkerThread import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.withContext +import okio.ByteString import okio.ByteString.Companion.toByteString import org.greenrobot.eventbus.EventBus import org.signal.core.util.Base64 @@ -1524,7 +1525,10 @@ class ExportState(val backupTime: Long, val mediaBackupEnabled: Boolean) { val recipientIds: MutableSet = hashSetOf() val threadIds: MutableSet = hashSetOf() val contactRecipientIds: MutableSet = hashSetOf() + val groupRecipientIds: MutableSet = hashSetOf() val threadIdToRecipientId: MutableMap = hashMapOf() + val recipientIdToAci: MutableMap = hashMapOf() + val aciToRecipientId: MutableMap = hashMapOf() } class ImportState(val mediaRootBackupKey: MediaRootBackupKey) { diff --git a/app/src/main/java/org/thoughtcrime/securesms/backup/v2/exporters/ChatItemArchiveExporter.kt b/app/src/main/java/org/thoughtcrime/securesms/backup/v2/exporters/ChatItemArchiveExporter.kt index a0a6d95a32..baf0aa5964 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/backup/v2/exporters/ChatItemArchiveExporter.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/backup/v2/exporters/ChatItemArchiveExporter.kt @@ -12,6 +12,7 @@ import org.json.JSONException import org.signal.core.util.Base64 import org.signal.core.util.EventTimer import org.signal.core.util.Hex +import org.signal.core.util.ParallelEventTimer import org.signal.core.util.concurrent.SignalExecutors import org.signal.core.util.logging.Log import org.signal.core.util.nullIfBlank @@ -28,7 +29,6 @@ import org.thoughtcrime.securesms.attachments.DatabaseAttachment import org.thoughtcrime.securesms.backup.v2.ExportOddities import org.thoughtcrime.securesms.backup.v2.ExportSkips import org.thoughtcrime.securesms.backup.v2.ExportState -import org.thoughtcrime.securesms.backup.v2.database.getThreadGroupStatus import org.thoughtcrime.securesms.backup.v2.proto.ChatItem import org.thoughtcrime.securesms.backup.v2.proto.ChatUpdateMessage import org.thoughtcrime.securesms.backup.v2.proto.ContactAttachment @@ -88,7 +88,6 @@ import org.thoughtcrime.securesms.payments.State import org.thoughtcrime.securesms.recipients.RecipientId import org.thoughtcrime.securesms.util.JsonUtils import org.thoughtcrime.securesms.util.MediaUtil -import org.whispersystems.signalservice.api.push.ServiceId.ACI import org.whispersystems.signalservice.api.util.UuidUtil import org.whispersystems.signalservice.api.util.toByteArray import java.io.Closeable @@ -98,7 +97,6 @@ import java.util.Queue import java.util.concurrent.Callable import java.util.concurrent.ExecutorService import java.util.concurrent.Future -import kotlin.jvm.optionals.getOrNull import kotlin.math.max import kotlin.time.Duration.Companion.days import org.thoughtcrime.securesms.backup.v2.proto.BodyRange as BackupBodyRange @@ -124,8 +122,15 @@ class ChatItemArchiveExporter( private val cursorGenerator: (Long, Int) -> Cursor ) : Iterator, Closeable { + /** Timer for more macro-level events, like fetching extra data vs transforming the data. */ private val eventTimer = EventTimer() + /** Timer for just the transformation process, to see what types of transformations are taking more time. */ + private val transformTimer = EventTimer() + + /** Timer for fetching extra data. */ + private val extraDataTimer = ParallelEventTimer() + /** * A queue of already-parsed ChatItems. Processing in batches means that we read ahead in the cursor and put * the pending items here. @@ -149,9 +154,11 @@ class ChatItemArchiveExporter( val extraData = fetchExtraMessageData(db, records.keys) eventTimer.emit("extra-data") + transformTimer.emit("ignore") for ((id, record) in records) { - val builder = record.toBasicChatItemBuilder(selfRecipientId, extraData.isGroupThreadById[id] ?: false, extraData.groupReceiptsById[id], exportState, backupStartTime) + val builder = record.toBasicChatItemBuilder(selfRecipientId, extraData.groupReceiptsById[id], exportState, backupStartTime) + transformTimer.emit("basic") if (builder == null) { continue @@ -160,10 +167,12 @@ class ChatItemArchiveExporter( when { record.remoteDeleted -> { builder.remoteDeletedMessage = RemoteDeletedMessage() + transformTimer.emit("remote-delete") } MessageTypes.isJoinedType(record.type) -> { builder.updateMessage = simpleUpdate(SimpleChatUpdate.Type.JOINED_SIGNAL) + transformTimer.emit("simple-update") } MessageTypes.isIdentityUpdate(record.type) -> { @@ -172,6 +181,7 @@ class ChatItemArchiveExporter( continue } builder.updateMessage = simpleUpdate(SimpleChatUpdate.Type.IDENTITY_UPDATE) + transformTimer.emit("simple-update") } MessageTypes.isIdentityVerified(record.type) -> { @@ -180,6 +190,7 @@ class ChatItemArchiveExporter( continue } builder.updateMessage = simpleUpdate(SimpleChatUpdate.Type.IDENTITY_VERIFIED) + transformTimer.emit("simple-update") } MessageTypes.isIdentityDefault(record.type) -> { @@ -188,65 +199,79 @@ class ChatItemArchiveExporter( continue } builder.updateMessage = simpleUpdate(SimpleChatUpdate.Type.IDENTITY_DEFAULT) + transformTimer.emit("simple-update") } MessageTypes.isChangeNumber(record.type) -> { builder.updateMessage = simpleUpdate(SimpleChatUpdate.Type.CHANGE_NUMBER) + transformTimer.emit("simple-update") } MessageTypes.isReleaseChannelDonationRequest(record.type) -> { builder.updateMessage = simpleUpdate(SimpleChatUpdate.Type.RELEASE_CHANNEL_DONATION_REQUEST) + transformTimer.emit("simple-update") } MessageTypes.isEndSessionType(record.type) -> { builder.updateMessage = simpleUpdate(SimpleChatUpdate.Type.END_SESSION) + transformTimer.emit("simple-update") } MessageTypes.isChatSessionRefresh(record.type) -> { builder.updateMessage = simpleUpdate(SimpleChatUpdate.Type.CHAT_SESSION_REFRESH) + transformTimer.emit("simple-update") } MessageTypes.isBadDecryptType(record.type) -> { builder.updateMessage = simpleUpdate(SimpleChatUpdate.Type.BAD_DECRYPT) + transformTimer.emit("simple-update") } MessageTypes.isPaymentsActivated(record.type) -> { builder.updateMessage = simpleUpdate(SimpleChatUpdate.Type.PAYMENTS_ACTIVATED) + transformTimer.emit("simple-update") } MessageTypes.isPaymentsRequestToActivate(record.type) -> { builder.updateMessage = simpleUpdate(SimpleChatUpdate.Type.PAYMENT_ACTIVATION_REQUEST) + transformTimer.emit("simple-update") } MessageTypes.isUnsupportedMessageType(record.type) -> { builder.updateMessage = simpleUpdate(SimpleChatUpdate.Type.UNSUPPORTED_PROTOCOL_MESSAGE) + transformTimer.emit("simple-update") } MessageTypes.isReportedSpam(record.type) -> { builder.updateMessage = simpleUpdate(SimpleChatUpdate.Type.REPORTED_SPAM) + transformTimer.emit("simple-update") } MessageTypes.isMessageRequestAccepted(record.type) -> { builder.updateMessage = simpleUpdate(SimpleChatUpdate.Type.MESSAGE_REQUEST_ACCEPTED) + transformTimer.emit("simple-update") } MessageTypes.isBlocked(record.type) -> { builder.updateMessage = simpleUpdate(SimpleChatUpdate.Type.BLOCKED) + transformTimer.emit("simple-update") } MessageTypes.isUnblocked(record.type) -> { builder.updateMessage = simpleUpdate(SimpleChatUpdate.Type.UNBLOCKED) + transformTimer.emit("simple-update") } MessageTypes.isExpirationTimerUpdate(record.type) -> { - if (db.threadTable.getThreadRecord(record.threadId)?.recipient?.isGroup == true) { - builder.updateMessage = record.toRemoteGroupExpireTimerUpdateFromGv1(db) ?: continue + if (exportState.threadIdToRecipientId[record.threadId] in exportState.groupRecipientIds) { + builder.updateMessage = record.toRemoteGroupExpireTimerUpdateFromGv1(exportState) ?: continue } else { builder.updateMessage = ChatUpdateMessage(expirationTimerChange = ExpirationTimerChatUpdate(record.expiresIn)) } builder.expireStartDate = null builder.expiresInMs = null + transformTimer.emit("expire-update") } MessageTypes.isProfileChange(record.type) -> { @@ -261,14 +286,17 @@ class ChatItemArchiveExporter( } builder.updateMessage = record.toRemoteProfileChangeUpdate() ?: continue + transformTimer.emit("profile-change") } MessageTypes.isSessionSwitchoverType(record.type) -> { builder.updateMessage = record.toRemoteSessionSwitchoverUpdate() + transformTimer.emit("sse") } MessageTypes.isThreadMergeType(record.type) -> { builder.updateMessage = record.toRemoteThreadMergeUpdate() ?: continue + transformTimer.emit("thread-merge") } MessageTypes.isGroupV2(record.type) && MessageTypes.isGroupUpdate(record.type) -> { @@ -278,10 +306,12 @@ class ChatItemArchiveExporter( continue } builder.updateMessage = update + transformTimer.emit("group-update-v2") } MessageTypes.isGroupUpdate(record.type) || MessageTypes.isGroupQuit(record.type) -> { - builder.updateMessage = record.toRemoteGroupUpdateFromGv1(db) ?: continue + builder.updateMessage = record.toRemoteGroupUpdateFromGv1(exportState) ?: continue + transformTimer.emit("group-update-v1") } MessageTypes.isGroupV1MigrationEvent(record.type) -> { @@ -290,11 +320,13 @@ class ChatItemArchiveExporter( updates = listOf(GroupChangeChatUpdate.Update(groupV2MigrationUpdate = GroupV2MigrationUpdate())) ) ) + transformTimer.emit("gv1-migration") } MessageTypes.isCallLog(record.type) -> { val call = db.callTable.getCallByMessageId(record.id) - builder.updateMessage = call?.toRemoteCallUpdate(db, record) ?: continue + builder.updateMessage = call?.toRemoteCallUpdate(exportState, record) ?: continue + transformTimer.emit("call-log") } MessageTypes.isPaymentsNotification(record.type) -> { @@ -303,18 +335,22 @@ class ChatItemArchiveExporter( continue } builder.paymentNotification = record.toRemotePaymentNotificationUpdate(db) + transformTimer.emit("payment") } MessageTypes.isGiftBadge(record.type) -> { builder.giftBadge = record.toRemoteGiftBadgeUpdate() ?: continue + transformTimer.emit("gift-badge") } !record.sharedContacts.isNullOrEmpty() -> { builder.contactMessage = record.toRemoteContactMessage(mediaArchiveEnabled = mediaArchiveEnabled, reactionRecords = extraData.reactionsById[id], attachments = extraData.attachmentsById[id]) ?: continue + transformTimer.emit("contact") } record.viewOnce -> { builder.viewOnceMessage = record.toRemoteViewOnceMessage(mediaArchiveEnabled = mediaArchiveEnabled, reactionRecords = extraData.reactionsById[id], attachments = extraData.attachmentsById[id]) + transformTimer.emit("voice") } record.parentStoryId != 0L -> { @@ -323,6 +359,7 @@ class ChatItemArchiveExporter( continue } builder.directStoryReplyMessage = record.toRemoteDirectStoryReplyMessage(mediaArchiveEnabled = mediaArchiveEnabled, reactionRecords = extraData.reactionsById[id], attachments = extraData.attachmentsById[record.id]) ?: continue + transformTimer.emit("story") } else -> { @@ -338,7 +375,7 @@ class ChatItemArchiveExporter( builder.stickerMessage = sticker.toRemoteStickerMessage(sentTimestamp = record.dateSent, mediaArchiveEnabled = mediaArchiveEnabled, reactions = extraData.reactionsById[id]) } else { val standardMessage = record.toRemoteStandardMessage( - db = db, + exportState = exportState, mediaArchiveEnabled = mediaArchiveEnabled, reactionRecords = extraData.reactionsById[id], mentions = extraData.mentionsById[id], @@ -351,6 +388,7 @@ class ChatItemArchiveExporter( } builder.standardMessage = standardMessage + transformTimer.emit("standard") } } } @@ -367,6 +405,7 @@ class ChatItemArchiveExporter( } previousEdits += builder.build() } + transformTimer.emit("revisions") } eventTimer.emit("transform") @@ -385,6 +424,8 @@ class ChatItemArchiveExporter( override fun close() { Log.d(TAG, "[ChatItemArchiveExporter][batchSize = $batchSize] ${eventTimer.stop().summary}") + Log.d(TAG, "[ChatItemArchiveExporterTransform][batchSize = $batchSize] ${transformTimer.stop().summary}") + Log.d(TAG, "[ChatItemArchiveExporterExtraData][batchSize = $batchSize] ${extraDataTimer.stop().summary}") } private fun readNextMessageRecordBatch(pastIds: Set): LinkedHashMap { @@ -404,37 +445,39 @@ class ChatItemArchiveExporter( val executor = SignalExecutors.BOUNDED val mentionsFuture = executor.submitTyped { - db.mentionTable.getMentionsForMessages(messageIds) + extraDataTimer.timeEvent("mentions") { + db.mentionTable.getMentionsForMessages(messageIds) + } } val reactionsFuture = executor.submitTyped { - db.reactionTable.getReactionsForMessages(messageIds) + extraDataTimer.timeEvent("reactions") { + db.reactionTable.getReactionsForMessages(messageIds) + } } val attachmentsFuture = executor.submitTyped { - db.attachmentTable.getAttachmentsForMessages(messageIds) + extraDataTimer.timeEvent("attachments") { + db.attachmentTable.getAttachmentsForMessages(messageIds) + } } val groupReceiptsFuture = executor.submitTyped { - db.groupReceiptTable.getGroupReceiptInfoForMessages(messageIds) - } - - val isGroupThreadFuture = executor.submitTyped { - db.threadTable.getThreadGroupStatus(messageIds) + extraDataTimer.timeEvent("group-receipts") { + db.groupReceiptTable.getGroupReceiptInfoForMessages(messageIds) + } } val mentionsResult = mentionsFuture.get() val reactionsResult = reactionsFuture.get() val attachmentsResult = attachmentsFuture.get() val groupReceiptsResult = groupReceiptsFuture.get() - val isGroupThreadResult = isGroupThreadFuture.get() return ExtraMessageData( mentionsById = mentionsResult, reactionsById = reactionsResult, attachmentsById = attachmentsResult, - groupReceiptsById = groupReceiptsResult, - isGroupThreadById = isGroupThreadResult + groupReceiptsById = groupReceiptsResult ) } } @@ -443,7 +486,7 @@ private fun simpleUpdate(type: SimpleChatUpdate.Type): ChatUpdateMessage { return ChatUpdateMessage(simpleUpdate = SimpleChatUpdate(type = type)) } -private fun BackupMessageRecord.toBasicChatItemBuilder(selfRecipientId: RecipientId, isGroupThread: Boolean, groupReceipts: List?, exportState: ExportState, backupStartTime: Long): ChatItem.Builder? { +private fun BackupMessageRecord.toBasicChatItemBuilder(selfRecipientId: RecipientId, groupReceipts: List?, exportState: ExportState, backupStartTime: Long): ChatItem.Builder? { val record = this if (this.threadId !in exportState.threadIds) { @@ -499,7 +542,7 @@ private fun BackupMessageRecord.toBasicChatItemBuilder(selfRecipientId: Recipien } Direction.OUTGOING -> { outgoing = ChatItem.OutgoingMessageDetails( - sendStatus = record.toRemoteSendStatus(isGroupThread, groupReceipts, exportState) + sendStatus = record.toRemoteSendStatus(isGroupThread = exportState.threadIdToRecipientId[this.chatId] in exportState.groupRecipientIds, groupReceipts = groupReceipts, exportState = exportState) ) if (expiresInMs != null && outgoing?.sendStatus?.all { it.pending == null && it.failed == null } == true) { @@ -607,8 +650,8 @@ private fun BackupMessageRecord.toRemoteGroupUpdate(): ChatUpdateMessage? { return null } -private fun BackupMessageRecord.toRemoteGroupUpdateFromGv1(db: SignalDatabase): ChatUpdateMessage? { - val aci = db.recipientTable.getRecord(RecipientId.from(this.fromRecipientId)).aci?.toByteString() ?: return null +private fun BackupMessageRecord.toRemoteGroupUpdateFromGv1(exportState: ExportState): ChatUpdateMessage? { + val aci = exportState.recipientIdToAci[this.fromRecipientId] ?: return null return ChatUpdateMessage( groupChange = GroupChangeChatUpdate( updates = listOf( @@ -622,8 +665,8 @@ private fun BackupMessageRecord.toRemoteGroupUpdateFromGv1(db: SignalDatabase): ) } -private fun BackupMessageRecord.toRemoteGroupExpireTimerUpdateFromGv1(db: SignalDatabase): ChatUpdateMessage? { - val updater = db.recipientTable.getRecord(RecipientId.from(this.fromRecipientId)).aci?.toByteString() ?: return null +private fun BackupMessageRecord.toRemoteGroupExpireTimerUpdateFromGv1(exportState: ExportState): ChatUpdateMessage? { + val updater = exportState.recipientIdToAci[this.fromRecipientId] ?: return null return ChatUpdateMessage( groupChange = GroupChangeChatUpdate( updates = listOf( @@ -638,7 +681,7 @@ private fun BackupMessageRecord.toRemoteGroupExpireTimerUpdateFromGv1(db: Signal ) } -private fun CallTable.Call.toRemoteCallUpdate(db: SignalDatabase, messageRecord: BackupMessageRecord): ChatUpdateMessage? { +private fun CallTable.Call.toRemoteCallUpdate(exportState: ExportState, messageRecord: BackupMessageRecord): ChatUpdateMessage? { return when (this.type) { CallTable.Type.GROUP_CALL -> { val groupCallUpdateDetails = GroupCallUpdateDetailsUtil.parse(messageRecord.body) @@ -660,7 +703,7 @@ private fun CallTable.Call.toRemoteCallUpdate(db: SignalDatabase, messageRecord: CallTable.Event.DELETE -> return null }, ringerRecipientId = this.ringerRecipient?.toLong(), - startedCallRecipientId = ACI.parseOrNull(groupCallUpdateDetails.startedCallUuid)?.let { db.recipientTable.getByAci(it).getOrNull()?.toLong() }, + startedCallRecipientId = groupCallUpdateDetails.startedCallUuid.takeIf { it.isNotEmpty() }?.let { exportState.aciToRecipientId[it] }, startedCallTimestamp = this.timestamp.clampToValidBackupRange(), endedCallTimestamp = groupCallUpdateDetails.endedCallTimestamp.clampToValidBackupRange().takeIf { it > 0 }, read = messageRecord.read @@ -930,11 +973,11 @@ private fun BackupMessageRecord.toRemoteDirectStoryReplyMessage(mediaArchiveEnab ) } -private fun BackupMessageRecord.toRemoteStandardMessage(db: SignalDatabase, mediaArchiveEnabled: Boolean, reactionRecords: List?, mentions: List?, attachments: List?): StandardMessage { +private fun BackupMessageRecord.toRemoteStandardMessage(exportState: ExportState, mediaArchiveEnabled: Boolean, reactionRecords: List?, mentions: List?, attachments: List?): StandardMessage { val text = body.nullIfBlank()?.let { Text( body = it, - bodyRanges = (this.bodyRanges?.toRemoteBodyRanges(this.dateSent) ?: emptyList()) + (mentions?.toRemoteBodyRanges(db) ?: emptyList()) + bodyRanges = (this.bodyRanges?.toRemoteBodyRanges(this.dateSent) ?: emptyList()) + (mentions?.toRemoteBodyRanges(exportState) ?: emptyList()) ) } @@ -988,7 +1031,7 @@ private fun BackupMessageRecord.toRemoteQuote(mediaArchiveEnabled: Boolean, atta attachments?.toRemoteQuoteAttachments(mediaArchiveEnabled) ?: emptyList() } - if (body == null && attachments.isEmpty()) { + if (remoteType == Quote.Type.NORMAL && body == null && attachments.isEmpty()) { Log.w(TAG, ExportOddities.emptyQuote(this.dateSent)) return null } @@ -1130,12 +1173,12 @@ private fun FailureReason?.toRemote(): PaymentNotification.TransactionDetails.Fa } } -private fun List.toRemoteBodyRanges(db: SignalDatabase): List { +private fun List.toRemoteBodyRanges(exportState: ExportState): List { return this.map { BackupBodyRange( start = it.start, length = it.length, - mentionAci = db.recipientTable.getRecord(it.recipientId).aci?.toByteString() + mentionAci = exportState.recipientIdToAci[it.recipientId.toLong()] ) } } @@ -1538,8 +1581,7 @@ private data class ExtraMessageData( val mentionsById: Map>, val reactionsById: Map>, val attachmentsById: Map>, - val groupReceiptsById: Map>, - val isGroupThreadById: Map + val groupReceiptsById: Map> ) private enum class Direction { diff --git a/app/src/main/java/org/thoughtcrime/securesms/backup/v2/processor/RecipientArchiveProcessor.kt b/app/src/main/java/org/thoughtcrime/securesms/backup/v2/processor/RecipientArchiveProcessor.kt index 27c0390c90..e0932c15fb 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/backup/v2/processor/RecipientArchiveProcessor.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/backup/v2/processor/RecipientArchiveProcessor.kt @@ -35,6 +35,11 @@ object RecipientArchiveProcessor { val TAG = Log.tag(RecipientArchiveProcessor::class.java) fun export(db: SignalDatabase, signalStore: SignalStore, exportState: ExportState, selfRecipientId: RecipientId, selfAci: ServiceId.ACI, emitter: BackupFrameEmitter) { + exportState.recipientIds.add(selfRecipientId.toLong()) + exportState.contactRecipientIds.add(selfRecipientId.toLong()) + exportState.recipientIdToAci[selfRecipientId.toLong()] = selfAci.toByteString() + exportState.aciToRecipientId[selfAci.toString()] = selfRecipientId.toLong() + val releaseChannelId = signalStore.releaseChannelValues.releaseChannelRecipientId if (releaseChannelId != null) { exportState.recipientIds.add(releaseChannelId.toLong()) @@ -56,6 +61,11 @@ object RecipientArchiveProcessor { if (recipient != null) { exportState.recipientIds.add(recipient.id) exportState.contactRecipientIds.add(recipient.id) + recipient.contact?.aci?.let { + exportState.recipientIdToAci[recipient.id] = it + exportState.aciToRecipientId[ServiceId.ACI.parseOrThrow(it).toString()] = recipient.id + } + emitter.emit(Frame(recipient = recipient)) } } @@ -64,6 +74,7 @@ object RecipientArchiveProcessor { db.recipientTable.getGroupsForBackup(selfAci).use { reader -> for (recipient in reader) { exportState.recipientIds.add(recipient.id) + exportState.groupRecipientIds.add(recipient.id) emitter.emit(Frame(recipient = recipient)) } } diff --git a/core-util/src/main/java/org/signal/core/util/ParallelEventTimer.kt b/core-util/src/main/java/org/signal/core/util/ParallelEventTimer.kt new file mode 100644 index 0000000000..54aec2a9ec --- /dev/null +++ b/core-util/src/main/java/org/signal/core/util/ParallelEventTimer.kt @@ -0,0 +1,129 @@ +/* + * Copyright 2023 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.signal.core.util + +import java.util.Queue +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ConcurrentLinkedQueue +import kotlin.math.ceil +import kotlin.math.floor +import kotlin.time.Duration.Companion.nanoseconds +import kotlin.time.DurationUnit + +/** + * Used to track performance metrics for large clusters of similar events that are happening simultaneously. + * + * Very similar to [EventTimer], but with no assumptions around threading, + * + * The timer tracks things at nanosecond granularity, but presents data as fractional milliseconds for readability. + */ +class ParallelEventTimer { + + val durationsByGroup: MutableMap> = ConcurrentHashMap() + private var startTime = System.nanoTime() + + fun reset() { + durationsByGroup.clear() + startTime = System.nanoTime() + } + + /** + * Begin an event associated with a group. You must call [EventStopper.stopEvent] on the returned object in order to indicate the action has completed. + */ + fun beginEvent(group: String): EventStopper { + val start = System.nanoTime() + return EventStopper { + val duration = System.nanoTime() - start + durationsByGroup.computeIfAbsent(group) { ConcurrentLinkedQueue() } += duration + } + } + + /** + * Time an event associated with a group. + */ + inline fun timeEvent(group: String, operation: () -> E): E { + val start = System.nanoTime() + val result = operation() + val duration = System.nanoTime() - start + durationsByGroup.computeIfAbsent(group) { ConcurrentLinkedQueue() } += duration + return result + } + + /** + * Stops the timer and returns a mapping of group -> [EventMetrics], which will tell you various statistics around timings for that group. + * It is assumed that all events have been stopped by the time this has been called. + */ + fun stop(): EventTimerResults { + val totalDuration = System.nanoTime() - startTime + + val data: Map = durationsByGroup + .mapValues { entry -> + val sorted: List = entry.value.sorted() + + EventMetrics( + totalEventTime = sorted.sum().nanoseconds.toDouble(DurationUnit.MILLISECONDS), + eventCount = sorted.size, + sortedDurationNanos = sorted + ) + } + + return EventTimerResults(totalDuration.nanoseconds.toDouble(DurationUnit.MILLISECONDS), data) + } + + class EventTimerResults(totalWallTime: Double, data: Map) : Map by data { + val summary by lazy { + val builder = StringBuilder() + + builder.append("[overall] totalWallTime: ${totalWallTime.roundedString(2)}, totalEventTime: ${data.values.map { it.totalEventTime}.sum().roundedString(2)} ") + + for (entry in data) { + builder.append("[${entry.key}] totalEventTime: ${entry.value.totalEventTime.roundedString(2)}, count: ${entry.value.eventCount}, p50: ${entry.value.p(50)}, p90: ${entry.value.p(90)}, p99: ${entry.value.p(99)} ") + } + + builder.toString() + } + } + + fun interface EventStopper { + fun stopEvent() + } + + data class EventMetrics( + /** The sum of all event times, in fractional milliseconds. If running operations in parallel, this will likely be larger than [totalWallTime]. */ + val totalEventTime: Double, + /** Total number of events observed. */ + val eventCount: Int, + private val sortedDurationNanos: List + ) { + + /** + * Returns the percentile of the duration data (e.g. p50, p90) as a formatted string containing fractional milliseconds rounded to the requested number of decimal places. + */ + fun p(percentile: Int, decimalPlaces: Int = 2): String { + return pNanos(percentile).nanoseconds.toDouble(DurationUnit.MILLISECONDS).roundedString(decimalPlaces) + } + + private fun pNanos(percentile: Int): Long { + if (sortedDurationNanos.isEmpty()) { + return 0L + } + + val index: Float = (percentile / 100f) * (sortedDurationNanos.size - 1) + val lowerIndex: Int = floor(index).toInt() + val upperIndex: Int = ceil(index).toInt() + + if (lowerIndex == upperIndex) { + return sortedDurationNanos[lowerIndex] + } + + val interpolationFactor: Float = index - lowerIndex + val lowerValue: Long = sortedDurationNanos[lowerIndex] + val upperValue: Long = sortedDurationNanos[upperIndex] + + return floor(lowerValue + (upperValue - lowerValue) * interpolationFactor).toLong() + } + } +}