mirror of
https://github.com/signalapp/Signal-Android.git
synced 2026-04-22 09:49:30 +01:00
Add local metrics for message processing.
This commit is contained in:
@@ -89,6 +89,7 @@ import org.thoughtcrime.securesms.util.EarlyMessageCacheEntry
|
||||
import org.thoughtcrime.securesms.util.LinkUtil
|
||||
import org.thoughtcrime.securesms.util.MediaUtil
|
||||
import org.thoughtcrime.securesms.util.MessageConstraintsUtil
|
||||
import org.thoughtcrime.securesms.util.SignalLocalMetrics
|
||||
import org.thoughtcrime.securesms.util.TextSecurePreferences
|
||||
import org.thoughtcrime.securesms.util.isStory
|
||||
import org.whispersystems.signalservice.api.crypto.EnvelopeMetadata
|
||||
@@ -119,7 +120,8 @@ object DataMessageProcessor {
|
||||
content: Content,
|
||||
metadata: EnvelopeMetadata,
|
||||
receivedTime: Long,
|
||||
earlyMessageCacheEntry: EarlyMessageCacheEntry?
|
||||
earlyMessageCacheEntry: EarlyMessageCacheEntry?,
|
||||
localMetrics: SignalLocalMetrics.MessageReceive?
|
||||
) {
|
||||
val message: DataMessage = content.dataMessage
|
||||
val groupSecretParams = if (message.hasGroupContext) GroupSecretParams.deriveFromMasterKey(message.groupV2.groupMasterKey) else null
|
||||
@@ -131,6 +133,7 @@ object DataMessageProcessor {
|
||||
if (groupProcessResult == MessageContentProcessorV2.Gv2PreProcessResult.IGNORE) {
|
||||
return
|
||||
}
|
||||
localMetrics?.onGv2Processed()
|
||||
}
|
||||
|
||||
var messageId: MessageId? = null
|
||||
@@ -146,8 +149,8 @@ object DataMessageProcessor {
|
||||
message.hasPayment() -> messageId = handlePayment(context, envelope, metadata, message, senderRecipient.id, receivedTime)
|
||||
message.hasStoryContext() -> messageId = handleStoryReply(context, envelope, metadata, message, senderRecipient, groupId, receivedTime)
|
||||
message.hasGiftBadge() -> messageId = handleGiftMessage(context, envelope, metadata, message, senderRecipient, threadRecipient.id, receivedTime)
|
||||
message.isMediaMessage -> messageId = handleMediaMessage(context, envelope, metadata, message, senderRecipient, threadRecipient, groupId, receivedTime)
|
||||
message.hasBody() -> messageId = handleTextMessage(context, envelope, metadata, message, senderRecipient, threadRecipient, groupId, receivedTime)
|
||||
message.isMediaMessage -> messageId = handleMediaMessage(context, envelope, metadata, message, senderRecipient, threadRecipient, groupId, receivedTime, localMetrics)
|
||||
message.hasBody() -> messageId = handleTextMessage(context, envelope, metadata, message, senderRecipient, threadRecipient, groupId, receivedTime, localMetrics)
|
||||
message.hasGroupCallUpdate() -> handleGroupCallUpdateMessage(envelope, message, senderRecipient.id, groupId)
|
||||
}
|
||||
|
||||
@@ -191,6 +194,8 @@ object DataMessageProcessor {
|
||||
}
|
||||
}
|
||||
}
|
||||
localMetrics?.onPostProcessComplete()
|
||||
localMetrics?.complete(groupId != null)
|
||||
}
|
||||
|
||||
private fun handleProfileKey(
|
||||
@@ -822,7 +827,8 @@ object DataMessageProcessor {
|
||||
senderRecipient: Recipient,
|
||||
threadRecipient: Recipient,
|
||||
groupId: GroupId.V2?,
|
||||
receivedTime: Long
|
||||
receivedTime: Long,
|
||||
localMetrics: SignalLocalMetrics.MessageReceive?
|
||||
): MessageId? {
|
||||
log(envelope.timestamp, "Media message.")
|
||||
|
||||
@@ -871,6 +877,7 @@ object DataMessageProcessor {
|
||||
} finally {
|
||||
SignalDatabase.messages.endTransaction()
|
||||
}
|
||||
localMetrics?.onInsertedMediaMessage()
|
||||
|
||||
return if (insertResult != null) {
|
||||
SignalDatabase.runPostSuccessfulTransaction {
|
||||
@@ -912,7 +919,8 @@ object DataMessageProcessor {
|
||||
senderRecipient: Recipient,
|
||||
threadRecipient: Recipient,
|
||||
groupId: GroupId.V2?,
|
||||
receivedTime: Long
|
||||
receivedTime: Long,
|
||||
localMetrics: SignalLocalMetrics.MessageReceive?
|
||||
): MessageId? {
|
||||
log(envelope.timestamp, "Text message.")
|
||||
|
||||
@@ -936,6 +944,7 @@ object DataMessageProcessor {
|
||||
)
|
||||
|
||||
val insertResult: InsertResult? = SignalDatabase.messages.insertMessageInbox(IncomingEncryptedMessage(textMessage, body)).orNull()
|
||||
localMetrics?.onInsertedTextMessage()
|
||||
|
||||
return if (insertResult != null) {
|
||||
ApplicationDependencies.getMessageNotifier().updateNotification(context, ConversationId.forConversation(insertResult.threadId))
|
||||
|
||||
@@ -295,10 +295,12 @@ class IncomingMessageObserver(private val context: Application) {
|
||||
}
|
||||
|
||||
private fun processMessage(bufferedProtocolStore: BufferedProtocolStore, envelope: SignalServiceProtos.Envelope, serverDeliveredTimestamp: Long): List<FollowUpOperation> {
|
||||
val localReceiveMetric = SignalLocalMetrics.MessageReceive.start()
|
||||
val result = MessageDecryptor.decrypt(context, bufferedProtocolStore, envelope, serverDeliveredTimestamp)
|
||||
localReceiveMetric.onEnvelopeDecrypted()
|
||||
when (result) {
|
||||
is MessageDecryptor.Result.Success -> {
|
||||
val job = PushProcessMessageJobV2.processOrDefer(messageContentProcessor, result)
|
||||
val job = PushProcessMessageJobV2.processOrDefer(messageContentProcessor, result, localReceiveMetric)
|
||||
if (job != null) {
|
||||
return result.followUpOperations + FollowUpOperation { job }
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package org.thoughtcrime.securesms.messages
|
||||
|
||||
import android.content.Context
|
||||
import org.signal.core.util.Stopwatch
|
||||
import org.signal.core.util.logging.Log
|
||||
import org.signal.core.util.orNull
|
||||
import org.signal.libsignal.protocol.SignalProtocolAddress
|
||||
@@ -41,6 +40,7 @@ import org.thoughtcrime.securesms.recipients.Recipient
|
||||
import org.thoughtcrime.securesms.recipients.RecipientId
|
||||
import org.thoughtcrime.securesms.util.EarlyMessageCacheEntry
|
||||
import org.thoughtcrime.securesms.util.FeatureFlags
|
||||
import org.thoughtcrime.securesms.util.SignalLocalMetrics
|
||||
import org.thoughtcrime.securesms.util.TextSecurePreferences
|
||||
import org.thoughtcrime.securesms.util.Util
|
||||
import org.whispersystems.signalservice.api.crypto.EnvelopeMetadata
|
||||
@@ -303,12 +303,10 @@ open class MessageContentProcessorV2(private val context: Context) {
|
||||
* store or enqueue early content jobs if we detect this as being early, to avoid recursive scenarios.
|
||||
*/
|
||||
@JvmOverloads
|
||||
open fun process(envelope: Envelope, content: Content, metadata: EnvelopeMetadata, serverDeliveredTimestamp: Long, processingEarlyContent: Boolean = false) {
|
||||
val stopwatch = Stopwatch("process-content")
|
||||
|
||||
open fun process(envelope: Envelope, content: Content, metadata: EnvelopeMetadata, serverDeliveredTimestamp: Long, processingEarlyContent: Boolean = false, localMetric: SignalLocalMetrics.MessageReceive? = null) {
|
||||
val senderRecipient = Recipient.externalPush(SignalServiceAddress(metadata.sourceServiceId, metadata.sourceE164))
|
||||
|
||||
handleMessage(senderRecipient, envelope, content, metadata, serverDeliveredTimestamp, processingEarlyContent, stopwatch)
|
||||
handleMessage(senderRecipient, envelope, content, metadata, serverDeliveredTimestamp, processingEarlyContent, localMetric)
|
||||
|
||||
val earlyCacheEntries: List<EarlyMessageCacheEntry>? = ApplicationDependencies
|
||||
.getEarlyMessageCache()
|
||||
@@ -318,11 +316,9 @@ open class MessageContentProcessorV2(private val context: Context) {
|
||||
if (!processingEarlyContent && earlyCacheEntries != null) {
|
||||
log(envelope.timestamp, "Found " + earlyCacheEntries.size + " dependent item(s) that were retrieved earlier. Processing.")
|
||||
for (entry in earlyCacheEntries) {
|
||||
handleMessage(senderRecipient, entry.envelope, entry.content, entry.metadata, entry.serverDeliveredTimestamp, processingEarlyContent = true, stopwatch)
|
||||
handleMessage(senderRecipient, entry.envelope, entry.content, entry.metadata, entry.serverDeliveredTimestamp, processingEarlyContent = true, localMetric = null)
|
||||
}
|
||||
stopwatch.split("early-entries")
|
||||
}
|
||||
stopwatch.stop(TAG)
|
||||
}
|
||||
|
||||
private fun handleMessage(
|
||||
@@ -332,7 +328,7 @@ open class MessageContentProcessorV2(private val context: Context) {
|
||||
metadata: EnvelopeMetadata,
|
||||
serverDeliveredTimestamp: Long,
|
||||
processingEarlyContent: Boolean,
|
||||
stopwatch: Stopwatch
|
||||
localMetric: SignalLocalMetrics.MessageReceive?
|
||||
) {
|
||||
val threadRecipient = getMessageDestination(content, senderRecipient)
|
||||
|
||||
@@ -345,7 +341,7 @@ open class MessageContentProcessorV2(private val context: Context) {
|
||||
val receivedTime: Long = handlePendingRetry(pending, envelope.timestamp, threadRecipient)
|
||||
|
||||
log(envelope.timestamp, "Beginning message processing. Sender: " + formatSender(senderRecipient.id, metadata.sourceServiceId, metadata.sourceDeviceId))
|
||||
stopwatch.split("pre-process")
|
||||
localMetric?.onPreProcessComplete()
|
||||
when {
|
||||
content.hasDataMessage() -> {
|
||||
DataMessageProcessor.process(
|
||||
@@ -356,9 +352,9 @@ open class MessageContentProcessorV2(private val context: Context) {
|
||||
content,
|
||||
metadata,
|
||||
receivedTime,
|
||||
if (processingEarlyContent) null else EarlyMessageCacheEntry(envelope, content, metadata, serverDeliveredTimestamp)
|
||||
if (processingEarlyContent) null else EarlyMessageCacheEntry(envelope, content, metadata, serverDeliveredTimestamp),
|
||||
localMetric
|
||||
)
|
||||
stopwatch.split("data-message")
|
||||
}
|
||||
content.hasSyncMessage() -> {
|
||||
TextSecurePreferences.setMultiDevice(context, true)
|
||||
@@ -371,7 +367,6 @@ open class MessageContentProcessorV2(private val context: Context) {
|
||||
metadata,
|
||||
if (processingEarlyContent) null else EarlyMessageCacheEntry(envelope, content, metadata, serverDeliveredTimestamp)
|
||||
)
|
||||
stopwatch.split("sync-message")
|
||||
}
|
||||
content.hasCallMessage() -> {
|
||||
log(envelope.timestamp, "Got call message...")
|
||||
@@ -395,11 +390,9 @@ open class MessageContentProcessorV2(private val context: Context) {
|
||||
metadata,
|
||||
if (processingEarlyContent) null else EarlyMessageCacheEntry(envelope, content, metadata, serverDeliveredTimestamp)
|
||||
)
|
||||
stopwatch.split("receipt-message")
|
||||
}
|
||||
content.hasTypingMessage() -> {
|
||||
handleTypingMessage(envelope, metadata, content.typingMessage, senderRecipient)
|
||||
stopwatch.split("typing-message")
|
||||
}
|
||||
content.hasStoryMessage() -> {
|
||||
StoryMessageProcessor.process(
|
||||
@@ -409,7 +402,6 @@ open class MessageContentProcessorV2(private val context: Context) {
|
||||
senderRecipient,
|
||||
threadRecipient
|
||||
)
|
||||
stopwatch.split("story-message")
|
||||
}
|
||||
content.hasDecryptionErrorMessage() -> {
|
||||
handleRetryReceipt(envelope, metadata, content.decryptionErrorMessage!!.toDecryptionErrorMessage(metadata), senderRecipient)
|
||||
|
||||
Reference in New Issue
Block a user