mirror of
https://github.com/signalapp/Signal-Android.git
synced 2026-02-15 07:28:30 +00:00
Various backup performance improvements.
This commit is contained in:
@@ -12,6 +12,7 @@ import androidx.annotation.Discouraged
|
||||
import androidx.annotation.WorkerThread
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.withContext
|
||||
import okio.ByteString
|
||||
import okio.ByteString.Companion.toByteString
|
||||
import org.greenrobot.eventbus.EventBus
|
||||
import org.signal.core.util.Base64
|
||||
@@ -1524,7 +1525,10 @@ class ExportState(val backupTime: Long, val mediaBackupEnabled: Boolean) {
|
||||
val recipientIds: MutableSet<Long> = hashSetOf()
|
||||
val threadIds: MutableSet<Long> = hashSetOf()
|
||||
val contactRecipientIds: MutableSet<Long> = hashSetOf()
|
||||
val groupRecipientIds: MutableSet<Long> = hashSetOf()
|
||||
val threadIdToRecipientId: MutableMap<Long, Long> = hashMapOf()
|
||||
val recipientIdToAci: MutableMap<Long, ByteString> = hashMapOf()
|
||||
val aciToRecipientId: MutableMap<String, Long> = hashMapOf()
|
||||
}
|
||||
|
||||
class ImportState(val mediaRootBackupKey: MediaRootBackupKey) {
|
||||
|
||||
@@ -12,6 +12,7 @@ import org.json.JSONException
|
||||
import org.signal.core.util.Base64
|
||||
import org.signal.core.util.EventTimer
|
||||
import org.signal.core.util.Hex
|
||||
import org.signal.core.util.ParallelEventTimer
|
||||
import org.signal.core.util.concurrent.SignalExecutors
|
||||
import org.signal.core.util.logging.Log
|
||||
import org.signal.core.util.nullIfBlank
|
||||
@@ -28,7 +29,6 @@ import org.thoughtcrime.securesms.attachments.DatabaseAttachment
|
||||
import org.thoughtcrime.securesms.backup.v2.ExportOddities
|
||||
import org.thoughtcrime.securesms.backup.v2.ExportSkips
|
||||
import org.thoughtcrime.securesms.backup.v2.ExportState
|
||||
import org.thoughtcrime.securesms.backup.v2.database.getThreadGroupStatus
|
||||
import org.thoughtcrime.securesms.backup.v2.proto.ChatItem
|
||||
import org.thoughtcrime.securesms.backup.v2.proto.ChatUpdateMessage
|
||||
import org.thoughtcrime.securesms.backup.v2.proto.ContactAttachment
|
||||
@@ -88,7 +88,6 @@ import org.thoughtcrime.securesms.payments.State
|
||||
import org.thoughtcrime.securesms.recipients.RecipientId
|
||||
import org.thoughtcrime.securesms.util.JsonUtils
|
||||
import org.thoughtcrime.securesms.util.MediaUtil
|
||||
import org.whispersystems.signalservice.api.push.ServiceId.ACI
|
||||
import org.whispersystems.signalservice.api.util.UuidUtil
|
||||
import org.whispersystems.signalservice.api.util.toByteArray
|
||||
import java.io.Closeable
|
||||
@@ -98,7 +97,6 @@ import java.util.Queue
|
||||
import java.util.concurrent.Callable
|
||||
import java.util.concurrent.ExecutorService
|
||||
import java.util.concurrent.Future
|
||||
import kotlin.jvm.optionals.getOrNull
|
||||
import kotlin.math.max
|
||||
import kotlin.time.Duration.Companion.days
|
||||
import org.thoughtcrime.securesms.backup.v2.proto.BodyRange as BackupBodyRange
|
||||
@@ -124,8 +122,15 @@ class ChatItemArchiveExporter(
|
||||
private val cursorGenerator: (Long, Int) -> Cursor
|
||||
) : Iterator<ChatItem?>, Closeable {
|
||||
|
||||
/** Timer for more macro-level events, like fetching extra data vs transforming the data. */
|
||||
private val eventTimer = EventTimer()
|
||||
|
||||
/** Timer for just the transformation process, to see what types of transformations are taking more time. */
|
||||
private val transformTimer = EventTimer()
|
||||
|
||||
/** Timer for fetching extra data. */
|
||||
private val extraDataTimer = ParallelEventTimer()
|
||||
|
||||
/**
|
||||
* A queue of already-parsed ChatItems. Processing in batches means that we read ahead in the cursor and put
|
||||
* the pending items here.
|
||||
@@ -149,9 +154,11 @@ class ChatItemArchiveExporter(
|
||||
|
||||
val extraData = fetchExtraMessageData(db, records.keys)
|
||||
eventTimer.emit("extra-data")
|
||||
transformTimer.emit("ignore")
|
||||
|
||||
for ((id, record) in records) {
|
||||
val builder = record.toBasicChatItemBuilder(selfRecipientId, extraData.isGroupThreadById[id] ?: false, extraData.groupReceiptsById[id], exportState, backupStartTime)
|
||||
val builder = record.toBasicChatItemBuilder(selfRecipientId, extraData.groupReceiptsById[id], exportState, backupStartTime)
|
||||
transformTimer.emit("basic")
|
||||
|
||||
if (builder == null) {
|
||||
continue
|
||||
@@ -160,10 +167,12 @@ class ChatItemArchiveExporter(
|
||||
when {
|
||||
record.remoteDeleted -> {
|
||||
builder.remoteDeletedMessage = RemoteDeletedMessage()
|
||||
transformTimer.emit("remote-delete")
|
||||
}
|
||||
|
||||
MessageTypes.isJoinedType(record.type) -> {
|
||||
builder.updateMessage = simpleUpdate(SimpleChatUpdate.Type.JOINED_SIGNAL)
|
||||
transformTimer.emit("simple-update")
|
||||
}
|
||||
|
||||
MessageTypes.isIdentityUpdate(record.type) -> {
|
||||
@@ -172,6 +181,7 @@ class ChatItemArchiveExporter(
|
||||
continue
|
||||
}
|
||||
builder.updateMessage = simpleUpdate(SimpleChatUpdate.Type.IDENTITY_UPDATE)
|
||||
transformTimer.emit("simple-update")
|
||||
}
|
||||
|
||||
MessageTypes.isIdentityVerified(record.type) -> {
|
||||
@@ -180,6 +190,7 @@ class ChatItemArchiveExporter(
|
||||
continue
|
||||
}
|
||||
builder.updateMessage = simpleUpdate(SimpleChatUpdate.Type.IDENTITY_VERIFIED)
|
||||
transformTimer.emit("simple-update")
|
||||
}
|
||||
|
||||
MessageTypes.isIdentityDefault(record.type) -> {
|
||||
@@ -188,65 +199,79 @@ class ChatItemArchiveExporter(
|
||||
continue
|
||||
}
|
||||
builder.updateMessage = simpleUpdate(SimpleChatUpdate.Type.IDENTITY_DEFAULT)
|
||||
transformTimer.emit("simple-update")
|
||||
}
|
||||
|
||||
MessageTypes.isChangeNumber(record.type) -> {
|
||||
builder.updateMessage = simpleUpdate(SimpleChatUpdate.Type.CHANGE_NUMBER)
|
||||
transformTimer.emit("simple-update")
|
||||
}
|
||||
|
||||
MessageTypes.isReleaseChannelDonationRequest(record.type) -> {
|
||||
builder.updateMessage = simpleUpdate(SimpleChatUpdate.Type.RELEASE_CHANNEL_DONATION_REQUEST)
|
||||
transformTimer.emit("simple-update")
|
||||
}
|
||||
|
||||
MessageTypes.isEndSessionType(record.type) -> {
|
||||
builder.updateMessage = simpleUpdate(SimpleChatUpdate.Type.END_SESSION)
|
||||
transformTimer.emit("simple-update")
|
||||
}
|
||||
|
||||
MessageTypes.isChatSessionRefresh(record.type) -> {
|
||||
builder.updateMessage = simpleUpdate(SimpleChatUpdate.Type.CHAT_SESSION_REFRESH)
|
||||
transformTimer.emit("simple-update")
|
||||
}
|
||||
|
||||
MessageTypes.isBadDecryptType(record.type) -> {
|
||||
builder.updateMessage = simpleUpdate(SimpleChatUpdate.Type.BAD_DECRYPT)
|
||||
transformTimer.emit("simple-update")
|
||||
}
|
||||
|
||||
MessageTypes.isPaymentsActivated(record.type) -> {
|
||||
builder.updateMessage = simpleUpdate(SimpleChatUpdate.Type.PAYMENTS_ACTIVATED)
|
||||
transformTimer.emit("simple-update")
|
||||
}
|
||||
|
||||
MessageTypes.isPaymentsRequestToActivate(record.type) -> {
|
||||
builder.updateMessage = simpleUpdate(SimpleChatUpdate.Type.PAYMENT_ACTIVATION_REQUEST)
|
||||
transformTimer.emit("simple-update")
|
||||
}
|
||||
|
||||
MessageTypes.isUnsupportedMessageType(record.type) -> {
|
||||
builder.updateMessage = simpleUpdate(SimpleChatUpdate.Type.UNSUPPORTED_PROTOCOL_MESSAGE)
|
||||
transformTimer.emit("simple-update")
|
||||
}
|
||||
|
||||
MessageTypes.isReportedSpam(record.type) -> {
|
||||
builder.updateMessage = simpleUpdate(SimpleChatUpdate.Type.REPORTED_SPAM)
|
||||
transformTimer.emit("simple-update")
|
||||
}
|
||||
|
||||
MessageTypes.isMessageRequestAccepted(record.type) -> {
|
||||
builder.updateMessage = simpleUpdate(SimpleChatUpdate.Type.MESSAGE_REQUEST_ACCEPTED)
|
||||
transformTimer.emit("simple-update")
|
||||
}
|
||||
|
||||
MessageTypes.isBlocked(record.type) -> {
|
||||
builder.updateMessage = simpleUpdate(SimpleChatUpdate.Type.BLOCKED)
|
||||
transformTimer.emit("simple-update")
|
||||
}
|
||||
|
||||
MessageTypes.isUnblocked(record.type) -> {
|
||||
builder.updateMessage = simpleUpdate(SimpleChatUpdate.Type.UNBLOCKED)
|
||||
transformTimer.emit("simple-update")
|
||||
}
|
||||
|
||||
MessageTypes.isExpirationTimerUpdate(record.type) -> {
|
||||
if (db.threadTable.getThreadRecord(record.threadId)?.recipient?.isGroup == true) {
|
||||
builder.updateMessage = record.toRemoteGroupExpireTimerUpdateFromGv1(db) ?: continue
|
||||
if (exportState.threadIdToRecipientId[record.threadId] in exportState.groupRecipientIds) {
|
||||
builder.updateMessage = record.toRemoteGroupExpireTimerUpdateFromGv1(exportState) ?: continue
|
||||
} else {
|
||||
builder.updateMessage = ChatUpdateMessage(expirationTimerChange = ExpirationTimerChatUpdate(record.expiresIn))
|
||||
}
|
||||
|
||||
builder.expireStartDate = null
|
||||
builder.expiresInMs = null
|
||||
transformTimer.emit("expire-update")
|
||||
}
|
||||
|
||||
MessageTypes.isProfileChange(record.type) -> {
|
||||
@@ -261,14 +286,17 @@ class ChatItemArchiveExporter(
|
||||
}
|
||||
|
||||
builder.updateMessage = record.toRemoteProfileChangeUpdate() ?: continue
|
||||
transformTimer.emit("profile-change")
|
||||
}
|
||||
|
||||
MessageTypes.isSessionSwitchoverType(record.type) -> {
|
||||
builder.updateMessage = record.toRemoteSessionSwitchoverUpdate()
|
||||
transformTimer.emit("sse")
|
||||
}
|
||||
|
||||
MessageTypes.isThreadMergeType(record.type) -> {
|
||||
builder.updateMessage = record.toRemoteThreadMergeUpdate() ?: continue
|
||||
transformTimer.emit("thread-merge")
|
||||
}
|
||||
|
||||
MessageTypes.isGroupV2(record.type) && MessageTypes.isGroupUpdate(record.type) -> {
|
||||
@@ -278,10 +306,12 @@ class ChatItemArchiveExporter(
|
||||
continue
|
||||
}
|
||||
builder.updateMessage = update
|
||||
transformTimer.emit("group-update-v2")
|
||||
}
|
||||
|
||||
MessageTypes.isGroupUpdate(record.type) || MessageTypes.isGroupQuit(record.type) -> {
|
||||
builder.updateMessage = record.toRemoteGroupUpdateFromGv1(db) ?: continue
|
||||
builder.updateMessage = record.toRemoteGroupUpdateFromGv1(exportState) ?: continue
|
||||
transformTimer.emit("group-update-v1")
|
||||
}
|
||||
|
||||
MessageTypes.isGroupV1MigrationEvent(record.type) -> {
|
||||
@@ -290,11 +320,13 @@ class ChatItemArchiveExporter(
|
||||
updates = listOf(GroupChangeChatUpdate.Update(groupV2MigrationUpdate = GroupV2MigrationUpdate()))
|
||||
)
|
||||
)
|
||||
transformTimer.emit("gv1-migration")
|
||||
}
|
||||
|
||||
MessageTypes.isCallLog(record.type) -> {
|
||||
val call = db.callTable.getCallByMessageId(record.id)
|
||||
builder.updateMessage = call?.toRemoteCallUpdate(db, record) ?: continue
|
||||
builder.updateMessage = call?.toRemoteCallUpdate(exportState, record) ?: continue
|
||||
transformTimer.emit("call-log")
|
||||
}
|
||||
|
||||
MessageTypes.isPaymentsNotification(record.type) -> {
|
||||
@@ -303,18 +335,22 @@ class ChatItemArchiveExporter(
|
||||
continue
|
||||
}
|
||||
builder.paymentNotification = record.toRemotePaymentNotificationUpdate(db)
|
||||
transformTimer.emit("payment")
|
||||
}
|
||||
|
||||
MessageTypes.isGiftBadge(record.type) -> {
|
||||
builder.giftBadge = record.toRemoteGiftBadgeUpdate() ?: continue
|
||||
transformTimer.emit("gift-badge")
|
||||
}
|
||||
|
||||
!record.sharedContacts.isNullOrEmpty() -> {
|
||||
builder.contactMessage = record.toRemoteContactMessage(mediaArchiveEnabled = mediaArchiveEnabled, reactionRecords = extraData.reactionsById[id], attachments = extraData.attachmentsById[id]) ?: continue
|
||||
transformTimer.emit("contact")
|
||||
}
|
||||
|
||||
record.viewOnce -> {
|
||||
builder.viewOnceMessage = record.toRemoteViewOnceMessage(mediaArchiveEnabled = mediaArchiveEnabled, reactionRecords = extraData.reactionsById[id], attachments = extraData.attachmentsById[id])
|
||||
transformTimer.emit("voice")
|
||||
}
|
||||
|
||||
record.parentStoryId != 0L -> {
|
||||
@@ -323,6 +359,7 @@ class ChatItemArchiveExporter(
|
||||
continue
|
||||
}
|
||||
builder.directStoryReplyMessage = record.toRemoteDirectStoryReplyMessage(mediaArchiveEnabled = mediaArchiveEnabled, reactionRecords = extraData.reactionsById[id], attachments = extraData.attachmentsById[record.id]) ?: continue
|
||||
transformTimer.emit("story")
|
||||
}
|
||||
|
||||
else -> {
|
||||
@@ -338,7 +375,7 @@ class ChatItemArchiveExporter(
|
||||
builder.stickerMessage = sticker.toRemoteStickerMessage(sentTimestamp = record.dateSent, mediaArchiveEnabled = mediaArchiveEnabled, reactions = extraData.reactionsById[id])
|
||||
} else {
|
||||
val standardMessage = record.toRemoteStandardMessage(
|
||||
db = db,
|
||||
exportState = exportState,
|
||||
mediaArchiveEnabled = mediaArchiveEnabled,
|
||||
reactionRecords = extraData.reactionsById[id],
|
||||
mentions = extraData.mentionsById[id],
|
||||
@@ -351,6 +388,7 @@ class ChatItemArchiveExporter(
|
||||
}
|
||||
|
||||
builder.standardMessage = standardMessage
|
||||
transformTimer.emit("standard")
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -367,6 +405,7 @@ class ChatItemArchiveExporter(
|
||||
}
|
||||
previousEdits += builder.build()
|
||||
}
|
||||
transformTimer.emit("revisions")
|
||||
}
|
||||
eventTimer.emit("transform")
|
||||
|
||||
@@ -385,6 +424,8 @@ class ChatItemArchiveExporter(
|
||||
|
||||
override fun close() {
|
||||
Log.d(TAG, "[ChatItemArchiveExporter][batchSize = $batchSize] ${eventTimer.stop().summary}")
|
||||
Log.d(TAG, "[ChatItemArchiveExporterTransform][batchSize = $batchSize] ${transformTimer.stop().summary}")
|
||||
Log.d(TAG, "[ChatItemArchiveExporterExtraData][batchSize = $batchSize] ${extraDataTimer.stop().summary}")
|
||||
}
|
||||
|
||||
private fun readNextMessageRecordBatch(pastIds: Set<Long>): LinkedHashMap<Long, BackupMessageRecord> {
|
||||
@@ -404,37 +445,39 @@ class ChatItemArchiveExporter(
|
||||
val executor = SignalExecutors.BOUNDED
|
||||
|
||||
val mentionsFuture = executor.submitTyped {
|
||||
db.mentionTable.getMentionsForMessages(messageIds)
|
||||
extraDataTimer.timeEvent("mentions") {
|
||||
db.mentionTable.getMentionsForMessages(messageIds)
|
||||
}
|
||||
}
|
||||
|
||||
val reactionsFuture = executor.submitTyped {
|
||||
db.reactionTable.getReactionsForMessages(messageIds)
|
||||
extraDataTimer.timeEvent("reactions") {
|
||||
db.reactionTable.getReactionsForMessages(messageIds)
|
||||
}
|
||||
}
|
||||
|
||||
val attachmentsFuture = executor.submitTyped {
|
||||
db.attachmentTable.getAttachmentsForMessages(messageIds)
|
||||
extraDataTimer.timeEvent("attachments") {
|
||||
db.attachmentTable.getAttachmentsForMessages(messageIds)
|
||||
}
|
||||
}
|
||||
|
||||
val groupReceiptsFuture = executor.submitTyped {
|
||||
db.groupReceiptTable.getGroupReceiptInfoForMessages(messageIds)
|
||||
}
|
||||
|
||||
val isGroupThreadFuture = executor.submitTyped {
|
||||
db.threadTable.getThreadGroupStatus(messageIds)
|
||||
extraDataTimer.timeEvent("group-receipts") {
|
||||
db.groupReceiptTable.getGroupReceiptInfoForMessages(messageIds)
|
||||
}
|
||||
}
|
||||
|
||||
val mentionsResult = mentionsFuture.get()
|
||||
val reactionsResult = reactionsFuture.get()
|
||||
val attachmentsResult = attachmentsFuture.get()
|
||||
val groupReceiptsResult = groupReceiptsFuture.get()
|
||||
val isGroupThreadResult = isGroupThreadFuture.get()
|
||||
|
||||
return ExtraMessageData(
|
||||
mentionsById = mentionsResult,
|
||||
reactionsById = reactionsResult,
|
||||
attachmentsById = attachmentsResult,
|
||||
groupReceiptsById = groupReceiptsResult,
|
||||
isGroupThreadById = isGroupThreadResult
|
||||
groupReceiptsById = groupReceiptsResult
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -443,7 +486,7 @@ private fun simpleUpdate(type: SimpleChatUpdate.Type): ChatUpdateMessage {
|
||||
return ChatUpdateMessage(simpleUpdate = SimpleChatUpdate(type = type))
|
||||
}
|
||||
|
||||
private fun BackupMessageRecord.toBasicChatItemBuilder(selfRecipientId: RecipientId, isGroupThread: Boolean, groupReceipts: List<GroupReceiptTable.GroupReceiptInfo>?, exportState: ExportState, backupStartTime: Long): ChatItem.Builder? {
|
||||
private fun BackupMessageRecord.toBasicChatItemBuilder(selfRecipientId: RecipientId, groupReceipts: List<GroupReceiptTable.GroupReceiptInfo>?, exportState: ExportState, backupStartTime: Long): ChatItem.Builder? {
|
||||
val record = this
|
||||
|
||||
if (this.threadId !in exportState.threadIds) {
|
||||
@@ -499,7 +542,7 @@ private fun BackupMessageRecord.toBasicChatItemBuilder(selfRecipientId: Recipien
|
||||
}
|
||||
Direction.OUTGOING -> {
|
||||
outgoing = ChatItem.OutgoingMessageDetails(
|
||||
sendStatus = record.toRemoteSendStatus(isGroupThread, groupReceipts, exportState)
|
||||
sendStatus = record.toRemoteSendStatus(isGroupThread = exportState.threadIdToRecipientId[this.chatId] in exportState.groupRecipientIds, groupReceipts = groupReceipts, exportState = exportState)
|
||||
)
|
||||
|
||||
if (expiresInMs != null && outgoing?.sendStatus?.all { it.pending == null && it.failed == null } == true) {
|
||||
@@ -607,8 +650,8 @@ private fun BackupMessageRecord.toRemoteGroupUpdate(): ChatUpdateMessage? {
|
||||
return null
|
||||
}
|
||||
|
||||
private fun BackupMessageRecord.toRemoteGroupUpdateFromGv1(db: SignalDatabase): ChatUpdateMessage? {
|
||||
val aci = db.recipientTable.getRecord(RecipientId.from(this.fromRecipientId)).aci?.toByteString() ?: return null
|
||||
private fun BackupMessageRecord.toRemoteGroupUpdateFromGv1(exportState: ExportState): ChatUpdateMessage? {
|
||||
val aci = exportState.recipientIdToAci[this.fromRecipientId] ?: return null
|
||||
return ChatUpdateMessage(
|
||||
groupChange = GroupChangeChatUpdate(
|
||||
updates = listOf(
|
||||
@@ -622,8 +665,8 @@ private fun BackupMessageRecord.toRemoteGroupUpdateFromGv1(db: SignalDatabase):
|
||||
)
|
||||
}
|
||||
|
||||
private fun BackupMessageRecord.toRemoteGroupExpireTimerUpdateFromGv1(db: SignalDatabase): ChatUpdateMessage? {
|
||||
val updater = db.recipientTable.getRecord(RecipientId.from(this.fromRecipientId)).aci?.toByteString() ?: return null
|
||||
private fun BackupMessageRecord.toRemoteGroupExpireTimerUpdateFromGv1(exportState: ExportState): ChatUpdateMessage? {
|
||||
val updater = exportState.recipientIdToAci[this.fromRecipientId] ?: return null
|
||||
return ChatUpdateMessage(
|
||||
groupChange = GroupChangeChatUpdate(
|
||||
updates = listOf(
|
||||
@@ -638,7 +681,7 @@ private fun BackupMessageRecord.toRemoteGroupExpireTimerUpdateFromGv1(db: Signal
|
||||
)
|
||||
}
|
||||
|
||||
private fun CallTable.Call.toRemoteCallUpdate(db: SignalDatabase, messageRecord: BackupMessageRecord): ChatUpdateMessage? {
|
||||
private fun CallTable.Call.toRemoteCallUpdate(exportState: ExportState, messageRecord: BackupMessageRecord): ChatUpdateMessage? {
|
||||
return when (this.type) {
|
||||
CallTable.Type.GROUP_CALL -> {
|
||||
val groupCallUpdateDetails = GroupCallUpdateDetailsUtil.parse(messageRecord.body)
|
||||
@@ -660,7 +703,7 @@ private fun CallTable.Call.toRemoteCallUpdate(db: SignalDatabase, messageRecord:
|
||||
CallTable.Event.DELETE -> return null
|
||||
},
|
||||
ringerRecipientId = this.ringerRecipient?.toLong(),
|
||||
startedCallRecipientId = ACI.parseOrNull(groupCallUpdateDetails.startedCallUuid)?.let { db.recipientTable.getByAci(it).getOrNull()?.toLong() },
|
||||
startedCallRecipientId = groupCallUpdateDetails.startedCallUuid.takeIf { it.isNotEmpty() }?.let { exportState.aciToRecipientId[it] },
|
||||
startedCallTimestamp = this.timestamp.clampToValidBackupRange(),
|
||||
endedCallTimestamp = groupCallUpdateDetails.endedCallTimestamp.clampToValidBackupRange().takeIf { it > 0 },
|
||||
read = messageRecord.read
|
||||
@@ -930,11 +973,11 @@ private fun BackupMessageRecord.toRemoteDirectStoryReplyMessage(mediaArchiveEnab
|
||||
)
|
||||
}
|
||||
|
||||
private fun BackupMessageRecord.toRemoteStandardMessage(db: SignalDatabase, mediaArchiveEnabled: Boolean, reactionRecords: List<ReactionRecord>?, mentions: List<Mention>?, attachments: List<DatabaseAttachment>?): StandardMessage {
|
||||
private fun BackupMessageRecord.toRemoteStandardMessage(exportState: ExportState, mediaArchiveEnabled: Boolean, reactionRecords: List<ReactionRecord>?, mentions: List<Mention>?, attachments: List<DatabaseAttachment>?): StandardMessage {
|
||||
val text = body.nullIfBlank()?.let {
|
||||
Text(
|
||||
body = it,
|
||||
bodyRanges = (this.bodyRanges?.toRemoteBodyRanges(this.dateSent) ?: emptyList()) + (mentions?.toRemoteBodyRanges(db) ?: emptyList())
|
||||
bodyRanges = (this.bodyRanges?.toRemoteBodyRanges(this.dateSent) ?: emptyList()) + (mentions?.toRemoteBodyRanges(exportState) ?: emptyList())
|
||||
)
|
||||
}
|
||||
|
||||
@@ -988,7 +1031,7 @@ private fun BackupMessageRecord.toRemoteQuote(mediaArchiveEnabled: Boolean, atta
|
||||
attachments?.toRemoteQuoteAttachments(mediaArchiveEnabled) ?: emptyList()
|
||||
}
|
||||
|
||||
if (body == null && attachments.isEmpty()) {
|
||||
if (remoteType == Quote.Type.NORMAL && body == null && attachments.isEmpty()) {
|
||||
Log.w(TAG, ExportOddities.emptyQuote(this.dateSent))
|
||||
return null
|
||||
}
|
||||
@@ -1130,12 +1173,12 @@ private fun FailureReason?.toRemote(): PaymentNotification.TransactionDetails.Fa
|
||||
}
|
||||
}
|
||||
|
||||
private fun List<Mention>.toRemoteBodyRanges(db: SignalDatabase): List<BackupBodyRange> {
|
||||
private fun List<Mention>.toRemoteBodyRanges(exportState: ExportState): List<BackupBodyRange> {
|
||||
return this.map {
|
||||
BackupBodyRange(
|
||||
start = it.start,
|
||||
length = it.length,
|
||||
mentionAci = db.recipientTable.getRecord(it.recipientId).aci?.toByteString()
|
||||
mentionAci = exportState.recipientIdToAci[it.recipientId.toLong()]
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -1538,8 +1581,7 @@ private data class ExtraMessageData(
|
||||
val mentionsById: Map<Long, List<Mention>>,
|
||||
val reactionsById: Map<Long, List<ReactionRecord>>,
|
||||
val attachmentsById: Map<Long, List<DatabaseAttachment>>,
|
||||
val groupReceiptsById: Map<Long, List<GroupReceiptTable.GroupReceiptInfo>>,
|
||||
val isGroupThreadById: Map<Long, Boolean>
|
||||
val groupReceiptsById: Map<Long, List<GroupReceiptTable.GroupReceiptInfo>>
|
||||
)
|
||||
|
||||
private enum class Direction {
|
||||
|
||||
@@ -35,6 +35,11 @@ object RecipientArchiveProcessor {
|
||||
val TAG = Log.tag(RecipientArchiveProcessor::class.java)
|
||||
|
||||
fun export(db: SignalDatabase, signalStore: SignalStore, exportState: ExportState, selfRecipientId: RecipientId, selfAci: ServiceId.ACI, emitter: BackupFrameEmitter) {
|
||||
exportState.recipientIds.add(selfRecipientId.toLong())
|
||||
exportState.contactRecipientIds.add(selfRecipientId.toLong())
|
||||
exportState.recipientIdToAci[selfRecipientId.toLong()] = selfAci.toByteString()
|
||||
exportState.aciToRecipientId[selfAci.toString()] = selfRecipientId.toLong()
|
||||
|
||||
val releaseChannelId = signalStore.releaseChannelValues.releaseChannelRecipientId
|
||||
if (releaseChannelId != null) {
|
||||
exportState.recipientIds.add(releaseChannelId.toLong())
|
||||
@@ -56,6 +61,11 @@ object RecipientArchiveProcessor {
|
||||
if (recipient != null) {
|
||||
exportState.recipientIds.add(recipient.id)
|
||||
exportState.contactRecipientIds.add(recipient.id)
|
||||
recipient.contact?.aci?.let {
|
||||
exportState.recipientIdToAci[recipient.id] = it
|
||||
exportState.aciToRecipientId[ServiceId.ACI.parseOrThrow(it).toString()] = recipient.id
|
||||
}
|
||||
|
||||
emitter.emit(Frame(recipient = recipient))
|
||||
}
|
||||
}
|
||||
@@ -64,6 +74,7 @@ object RecipientArchiveProcessor {
|
||||
db.recipientTable.getGroupsForBackup(selfAci).use { reader ->
|
||||
for (recipient in reader) {
|
||||
exportState.recipientIds.add(recipient.id)
|
||||
exportState.groupRecipientIds.add(recipient.id)
|
||||
emitter.emit(Frame(recipient = recipient))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,129 @@
|
||||
/*
|
||||
* Copyright 2023 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.signal.core.util
|
||||
|
||||
import java.util.Queue
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.ConcurrentLinkedQueue
|
||||
import kotlin.math.ceil
|
||||
import kotlin.math.floor
|
||||
import kotlin.time.Duration.Companion.nanoseconds
|
||||
import kotlin.time.DurationUnit
|
||||
|
||||
/**
|
||||
* Used to track performance metrics for large clusters of similar events that are happening simultaneously.
|
||||
*
|
||||
* Very similar to [EventTimer], but with no assumptions around threading,
|
||||
*
|
||||
* The timer tracks things at nanosecond granularity, but presents data as fractional milliseconds for readability.
|
||||
*/
|
||||
class ParallelEventTimer {
|
||||
|
||||
val durationsByGroup: MutableMap<String, Queue<Long>> = ConcurrentHashMap()
|
||||
private var startTime = System.nanoTime()
|
||||
|
||||
fun reset() {
|
||||
durationsByGroup.clear()
|
||||
startTime = System.nanoTime()
|
||||
}
|
||||
|
||||
/**
|
||||
* Begin an event associated with a group. You must call [EventStopper.stopEvent] on the returned object in order to indicate the action has completed.
|
||||
*/
|
||||
fun beginEvent(group: String): EventStopper {
|
||||
val start = System.nanoTime()
|
||||
return EventStopper {
|
||||
val duration = System.nanoTime() - start
|
||||
durationsByGroup.computeIfAbsent(group) { ConcurrentLinkedQueue() } += duration
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Time an event associated with a group.
|
||||
*/
|
||||
inline fun <E> timeEvent(group: String, operation: () -> E): E {
|
||||
val start = System.nanoTime()
|
||||
val result = operation()
|
||||
val duration = System.nanoTime() - start
|
||||
durationsByGroup.computeIfAbsent(group) { ConcurrentLinkedQueue() } += duration
|
||||
return result
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops the timer and returns a mapping of group -> [EventMetrics], which will tell you various statistics around timings for that group.
|
||||
* It is assumed that all events have been stopped by the time this has been called.
|
||||
*/
|
||||
fun stop(): EventTimerResults {
|
||||
val totalDuration = System.nanoTime() - startTime
|
||||
|
||||
val data: Map<String, EventMetrics> = durationsByGroup
|
||||
.mapValues { entry ->
|
||||
val sorted: List<Long> = entry.value.sorted()
|
||||
|
||||
EventMetrics(
|
||||
totalEventTime = sorted.sum().nanoseconds.toDouble(DurationUnit.MILLISECONDS),
|
||||
eventCount = sorted.size,
|
||||
sortedDurationNanos = sorted
|
||||
)
|
||||
}
|
||||
|
||||
return EventTimerResults(totalDuration.nanoseconds.toDouble(DurationUnit.MILLISECONDS), data)
|
||||
}
|
||||
|
||||
class EventTimerResults(totalWallTime: Double, data: Map<String, EventMetrics>) : Map<String, EventMetrics> by data {
|
||||
val summary by lazy {
|
||||
val builder = StringBuilder()
|
||||
|
||||
builder.append("[overall] totalWallTime: ${totalWallTime.roundedString(2)}, totalEventTime: ${data.values.map { it.totalEventTime}.sum().roundedString(2)} ")
|
||||
|
||||
for (entry in data) {
|
||||
builder.append("[${entry.key}] totalEventTime: ${entry.value.totalEventTime.roundedString(2)}, count: ${entry.value.eventCount}, p50: ${entry.value.p(50)}, p90: ${entry.value.p(90)}, p99: ${entry.value.p(99)} ")
|
||||
}
|
||||
|
||||
builder.toString()
|
||||
}
|
||||
}
|
||||
|
||||
fun interface EventStopper {
|
||||
fun stopEvent()
|
||||
}
|
||||
|
||||
data class EventMetrics(
|
||||
/** The sum of all event times, in fractional milliseconds. If running operations in parallel, this will likely be larger than [totalWallTime]. */
|
||||
val totalEventTime: Double,
|
||||
/** Total number of events observed. */
|
||||
val eventCount: Int,
|
||||
private val sortedDurationNanos: List<Long>
|
||||
) {
|
||||
|
||||
/**
|
||||
* Returns the percentile of the duration data (e.g. p50, p90) as a formatted string containing fractional milliseconds rounded to the requested number of decimal places.
|
||||
*/
|
||||
fun p(percentile: Int, decimalPlaces: Int = 2): String {
|
||||
return pNanos(percentile).nanoseconds.toDouble(DurationUnit.MILLISECONDS).roundedString(decimalPlaces)
|
||||
}
|
||||
|
||||
private fun pNanos(percentile: Int): Long {
|
||||
if (sortedDurationNanos.isEmpty()) {
|
||||
return 0L
|
||||
}
|
||||
|
||||
val index: Float = (percentile / 100f) * (sortedDurationNanos.size - 1)
|
||||
val lowerIndex: Int = floor(index).toInt()
|
||||
val upperIndex: Int = ceil(index).toInt()
|
||||
|
||||
if (lowerIndex == upperIndex) {
|
||||
return sortedDurationNanos[lowerIndex]
|
||||
}
|
||||
|
||||
val interpolationFactor: Float = index - lowerIndex
|
||||
val lowerValue: Long = sortedDurationNanos[lowerIndex]
|
||||
val upperValue: Long = sortedDurationNanos[upperIndex]
|
||||
|
||||
return floor(lowerValue + (upperValue - lowerValue) * interpolationFactor).toLong()
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user