Add performance metrics for archive uploads.

This commit is contained in:
Greyson Parrelli
2025-08-14 13:33:23 -04:00
committed by Jeffrey Starke
parent 9ccdbb8e40
commit 73ad6221a6
5 changed files with 84 additions and 9 deletions

View File

@@ -15,8 +15,10 @@ import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.withContext
import org.signal.core.util.bytes
import org.signal.core.util.logging.Log
import org.signal.core.util.throttleLatest
import org.thoughtcrime.securesms.BuildConfig
import org.thoughtcrime.securesms.attachments.AttachmentId
import org.thoughtcrime.securesms.backup.v2.BackupRepository
import org.thoughtcrime.securesms.database.SignalDatabase
@@ -27,11 +29,13 @@ import org.thoughtcrime.securesms.jobs.BackupMessagesJob
import org.thoughtcrime.securesms.jobs.UploadAttachmentToArchiveJob
import org.thoughtcrime.securesms.keyvalue.SignalStore
import org.thoughtcrime.securesms.keyvalue.protos.ArchiveUploadProgressState
import org.thoughtcrime.securesms.util.SignalLocalMetrics
import org.whispersystems.signalservice.api.messages.AttachmentTransferProgress
import java.util.concurrent.ConcurrentHashMap
import kotlin.math.max
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds
import kotlin.time.DurationUnit
/**
* Tracks the progress of uploading your message archive and provides an observable stream of results.
@@ -48,7 +52,11 @@ object ArchiveUploadProgress {
private var uploadProgress: ArchiveUploadProgressState = SignalStore.backup.archiveUploadState ?: PROGRESS_NONE
private val partialMediaProgress: MutableMap<AttachmentId, Long> = ConcurrentHashMap()
private val attachmentProgress: MutableMap<AttachmentId, AttachmentProgressDetails> = ConcurrentHashMap()
private var debugAttachmentStartTime: Long = 0
private var debugTotalAttachments: Int = 0
private var debugTotalBytes: Long = 0
/**
* Observe this to get updates on the current upload progress.
@@ -69,9 +77,10 @@ object ArchiveUploadProgress {
return@map PROGRESS_NONE
}
val pendingMediaUploadBytes = SignalDatabase.attachments.getPendingArchiveUploadBytes() - partialMediaProgress.values.sum()
val pendingMediaUploadBytes = SignalDatabase.attachments.getPendingArchiveUploadBytes() - attachmentProgress.values.sumOf { it.bytesUploaded }
if (pendingMediaUploadBytes <= 0) {
Log.i(TAG, "No more pending bytes. Done!")
Log.d(TAG, buildDebugStats(debugAttachmentStartTime, debugTotalAttachments, debugTotalBytes))
return@map PROGRESS_NONE
}
@@ -155,7 +164,8 @@ object ArchiveUploadProgress {
}
}
fun onAttachmentsStarted(totalAttachmentBytes: Long) {
fun onAttachmentSectionStarted(totalAttachmentBytes: Long) {
debugAttachmentStartTime = System.currentTimeMillis()
updateState {
it.copy(
state = ArchiveUploadProgressState.State.UploadMedia,
@@ -165,13 +175,28 @@ object ArchiveUploadProgress {
}
}
fun onAttachmentStarted(attachmentId: AttachmentId, sizeBytes: Long) {
SignalLocalMetrics.ArchiveAttachmentUpload.start(attachmentId)
attachmentProgress[attachmentId] = AttachmentProgressDetails(startTimeMs = System.currentTimeMillis(), totalBytes = sizeBytes)
_progress.tryEmit(Unit)
}
fun onAttachmentProgress(attachmentId: AttachmentId, bytesUploaded: Long) {
partialMediaProgress[attachmentId] = bytesUploaded
attachmentProgress.getOrPut(attachmentId) { AttachmentProgressDetails() }.bytesUploaded = bytesUploaded
_progress.tryEmit(Unit)
}
fun onAttachmentFinished(attachmentId: AttachmentId) {
partialMediaProgress.remove(attachmentId)
SignalLocalMetrics.ArchiveAttachmentUpload.end(attachmentId)
debugTotalAttachments++
attachmentProgress[attachmentId]?.let {
if (BuildConfig.DEBUG) {
Log.d(TAG, "Attachment finished: $it")
}
debugTotalBytes += it.totalBytes
}
attachmentProgress.remove(attachmentId)
_progress.tryEmit(Unit)
}
@@ -265,4 +290,32 @@ object ArchiveUploadProgress {
}
}
}
private fun buildDebugStats(startTimeMs: Long, totalAttachments: Int, totalBytes: Long): String {
if (debugAttachmentStartTime <= 0 || debugTotalAttachments <= 0 || debugTotalBytes <= 0) {
return "Insufficient data to print debug stats."
}
val seconds: Double = (System.currentTimeMillis() - startTimeMs).milliseconds.toDouble(DurationUnit.SECONDS)
val bytesPerSecond: Long = (totalBytes / seconds).toLong()
return "TotalAttachments=$totalAttachments, TotalBytes=$totalBytes (${totalBytes.bytes.toUnitString()}), Rate=$bytesPerSecond bytes/sec (${bytesPerSecond.bytes.toUnitString()}/sec)"
}
private class AttachmentProgressDetails(
val startTimeMs: Long = 0,
val totalBytes: Long = 0,
var bytesUploaded: Long = 0
) {
override fun toString(): String {
if (startTimeMs == 0L || totalBytes == 0L) {
return "N/A"
}
val seconds: Double = (System.currentTimeMillis() - startTimeMs).milliseconds.toDouble(DurationUnit.SECONDS)
val bytesPerSecond: Long = (bytesUploaded / seconds).toLong()
return "Duration=${System.currentTimeMillis() - startTimeMs}ms, TotalBytes=$totalBytes (${totalBytes.bytes.toUnitString()}), Rate=$bytesPerSecond bytes/sec (${bytesPerSecond.bytes.toUnitString()}/sec)"
}
}
}

View File

@@ -61,7 +61,6 @@ import org.signal.core.util.requireString
import org.signal.core.util.select
import org.signal.core.util.toInt
import org.signal.core.util.update
import org.signal.core.util.updateAll
import org.signal.core.util.withinTransaction
import org.thoughtcrime.securesms.attachments.ArchivedAttachment
import org.thoughtcrime.securesms.attachments.Attachment
@@ -2011,11 +2010,13 @@ class AttachmentTable(
fun clearAllArchiveData() {
writableDatabase
.updateAll(TABLE_NAME)
.update(TABLE_NAME)
.values(
ARCHIVE_CDN to null,
ARCHIVE_TRANSFER_STATE to ArchiveTransferState.NONE.value
ARCHIVE_TRANSFER_STATE to ArchiveTransferState.NONE.value,
UPLOAD_TIMESTAMP to 0
)
.where("$ARCHIVE_CDN NOT NULL")
.run()
}

View File

@@ -47,7 +47,7 @@ class ArchiveAttachmentBackfillJob private constructor(parameters: Parameters) :
SignalDatabase.attachments.createRemoteKeyForAttachmentsThatNeedArchiveUpload()
ArchiveUploadProgress.onAttachmentsStarted(SignalDatabase.attachments.getPendingArchiveUploadBytes())
ArchiveUploadProgress.onAttachmentSectionStarted(SignalDatabase.attachments.getPendingArchiveUploadBytes())
if (!isCanceled) {
Log.i(TAG, "Adding ${jobs.size} jobs to backfill attachments.")

View File

@@ -188,6 +188,8 @@ class UploadAttachmentToArchiveJob private constructor(
null
}
ArchiveUploadProgress.onAttachmentStarted(attachmentId, attachment.size)
val attachmentStream = try {
AttachmentUploadUtil.buildSignalServiceAttachmentStream(
context = context,

View File

@@ -6,6 +6,8 @@ import androidx.annotation.MainThread;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import org.thoughtcrime.securesms.attachments.AttachmentId;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@@ -497,4 +499,21 @@ public final class SignalLocalMetrics {
}
}
}
/**
* Tracks how long it took to upload an attachment to the archive CDN.
*/
public static final class ArchiveAttachmentUpload {
private static final String NAME = "archive-attachment-upload";
/** When the attachment begins uploading. */
public static void start(AttachmentId id) {
LocalMetrics.getInstance().start(NAME, NAME + id);
}
/** When the attachment finishes uploading. */
public static void end(AttachmentId id) {
LocalMetrics.getInstance().end(NAME + id);
}
}
}