Backfill collapsible messages.

This commit is contained in:
Michelle Tang
2026-03-25 10:27:35 -04:00
committed by Cody Henthorne
parent 75df16e842
commit 4f260c2063
6 changed files with 188 additions and 7 deletions

View File

@@ -110,6 +110,7 @@ import org.thoughtcrime.securesms.jobs.ArchiveAttachmentBackfillJob
import org.thoughtcrime.securesms.jobs.ArchiveThumbnailBackfillJob
import org.thoughtcrime.securesms.jobs.ArchiveThumbnailUploadJob
import org.thoughtcrime.securesms.jobs.AvatarGroupsV2DownloadJob
import org.thoughtcrime.securesms.jobs.BackfillCollapsedMessageJob
import org.thoughtcrime.securesms.jobs.BackupDeleteJob
import org.thoughtcrime.securesms.jobs.BackupMessagesJob
import org.thoughtcrime.securesms.jobs.BackupRestoreMediaJob
@@ -1500,6 +1501,10 @@ object BackupRepository {
AppDependencies.jobManager.addAll(groupJobs)
stopwatch.split("group-jobs")
if (RemoteConfig.collapseEvents) {
AppDependencies.jobManager.add(BackfillCollapsedMessageJob())
}
SignalStore.backup.firstAppVersion = header.firstAppVersion
SignalStore.internal.importedBackupDebugInfo = header.debugInfo.let { BackupDebugInfo.ADAPTER.decodeOrNull(it.toByteArray()) }

View File

@@ -49,6 +49,7 @@ import org.thoughtcrime.securesms.database.model.InAppPaymentSubscriberRecord
import org.thoughtcrime.securesms.database.model.MessageRecord
import org.thoughtcrime.securesms.dependencies.AppDependencies
import org.thoughtcrime.securesms.jobmanager.JobTracker
import org.thoughtcrime.securesms.jobs.BackfillCollapsedMessageJob
import org.thoughtcrime.securesms.jobs.CheckKeyTransparencyJob
import org.thoughtcrime.securesms.jobs.DownloadLatestEmojiDataJob
import org.thoughtcrime.securesms.jobs.EmojiSearchIndexDownloadJob
@@ -230,6 +231,16 @@ class InternalSettingsFragment : DSLSettingsFragment(R.string.preferences__inter
}
)
clickPref(
title = DSLSettingsText.from("Collapse chat updates"),
summary = DSLSettingsText.from("Collapses certain consecutive chat updates - cannot be undone."),
onClick = {
AppDependencies.jobManager.add(BackfillCollapsedMessageJob())
}
)
dividerPref()
sectionHeaderPref(DSLSettingsText.from("Playgrounds"))
clickPref(

View File

@@ -3620,18 +3620,22 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat
}
}
// TODO(michelle): Maybe reduce to the fields you actually need instead of everything
private fun getMessageDirectlyBefore(messageId: Long, threadId: Long, dateReceived: Long): MessageRecord? {
val message = readableDatabase
.select(*MMS_PROJECTION)
private fun getMessageDirectlyBefore(messageId: Long, threadId: Long, dateReceived: Long): PotentialCollapsibleMessage? {
return readableDatabase
.select(DATE_RECEIVED, TYPE, COLLAPSED_HEAD_ID, MESSAGE_EXTRAS)
.from(TABLE_NAME)
.where("$ID < ? AND $THREAD_ID = ?", messageId, threadId)
.orderBy("$DATE_RECEIVED DESC")
.limit(1)
.run()
.readToSingleObject { MmsReader(it).getCurrent() }
return message?.takeIf { DateUtils.isSameDay(message.dateReceived, dateReceived) }
.readToSingleObject { cursor ->
PotentialCollapsibleMessage(
type = cursor.requireLong(TYPE),
dateReceived = cursor.requireLong(DATE_RECEIVED),
collapsedHeadId = cursor.requireLong(COLLAPSED_HEAD_ID),
messageExtras = cursor.requireBlob(MESSAGE_EXTRAS)?.let { MessageExtras.ADAPTER.decode(it) }
)
}?.takeIf { DateUtils.isSameDay(it.dateReceived, dateReceived) }
}
private fun hasAudioAttachment(attachments: List<Attachment>): Boolean {
@@ -6548,6 +6552,16 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat
private val author: RecipientId
)
/**
* All the data required to calculate if a message is collapsible
*/
private data class PotentialCollapsibleMessage(
val type: Long,
val dateReceived: Long,
val collapsedHeadId: Long,
val messageExtras: MessageExtras?
)
private class TimestampReadResult(
val expiring: List<Pair<Long, Long>>,
val threads: List<Long>

View File

@@ -0,0 +1,146 @@
package org.thoughtcrime.securesms.jobs
import org.signal.core.util.logging.Log
import org.signal.core.util.readToList
import org.signal.core.util.requireBlob
import org.signal.core.util.requireBoolean
import org.signal.core.util.requireLong
import org.signal.core.util.select
import org.signal.core.util.update
import org.thoughtcrime.securesms.database.CollapsedState
import org.thoughtcrime.securesms.database.CollapsibleEvents
import org.thoughtcrime.securesms.database.MessageTable
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.database.model.databaseprotos.MessageExtras
import org.thoughtcrime.securesms.dependencies.AppDependencies
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobs.BackfillCollapsedMessageJob.Companion.BATCH_SIZE
import org.thoughtcrime.securesms.jobs.protos.BackfillCollapsedMessageJobData
import org.thoughtcrime.securesms.util.DateUtils
/**
* Backfills the collapsed state of chat events. Runs for [BATCH_SIZE] messages, then re-enqueues itself
* to allow other jobs to run, until the backfill is complete.
*/
class BackfillCollapsedMessageJob private constructor(
private val lastDateReceived: Long,
parameters: Parameters
) : Job(parameters) {
companion object {
const val KEY = "BackfillCollapsedMessageJob"
val TAG = Log.tag(BackfillCollapsedMessageJob::class)
private const val BATCH_SIZE = 5_000
}
constructor() : this(lastDateReceived = 1)
constructor(lastDateReceived: Long) : this(
lastDateReceived = lastDateReceived,
parameters = Parameters.Builder()
.setQueue(KEY)
.setMaxAttempts(Parameters.UNLIMITED)
.setGlobalPriority(Parameters.PRIORITY_LOWER)
.build()
)
override fun serialize(): ByteArray {
return BackfillCollapsedMessageJobData(lastDateReceived = lastDateReceived).encode()
}
override fun getFactoryKey(): String = KEY
override fun run(): Result {
val db = SignalDatabase.rawDatabase
val messages = db
.select(MessageTable.ID, MessageTable.THREAD_ID, MessageTable.DATE_RECEIVED, MessageTable.TYPE, MessageTable.READ, MessageTable.COLLAPSED_STATE, MessageTable.MESSAGE_EXTRAS)
.from(MessageTable.TABLE_NAME)
.where("${MessageTable.DATE_RECEIVED} > ?", lastDateReceived)
.orderBy("${MessageTable.DATE_RECEIVED}, ${MessageTable.ID}")
.limit(BATCH_SIZE)
.run()
.readToList { cursor ->
PotentialCollapsibleMessage(
id = cursor.requireLong(MessageTable.ID),
threadId = cursor.requireLong(MessageTable.THREAD_ID),
type = cursor.requireLong(MessageTable.TYPE),
dateReceived = cursor.requireLong(MessageTable.DATE_RECEIVED),
collapsedState = cursor.requireLong(MessageTable.COLLAPSED_STATE),
read = cursor.requireBoolean(MessageTable.READ),
messageExtras = cursor.requireBlob(MessageTable.MESSAGE_EXTRAS)?.let { MessageExtras.ADAPTER.decode(it) }
)
}
// Tracks the last/previous message to compare against the current message when determining collapsed state
val lastMessageByThread = mutableMapOf<Long, LastMessage?>()
for (message in messages) {
val collapsibleType = CollapsibleEvents.getCollapsibleType(message.type, message.messageExtras)
if (collapsibleType == null) {
lastMessageByThread[message.threadId] = null
} else {
val previous = lastMessageByThread[message.threadId]
val (collapsedState, headId) = if ((previous?.collapsibleType == collapsibleType) && DateUtils.isSameDay(previous.dateReceived, message.dateReceived)) {
val state = if (message.read) CollapsedState.COLLAPSED.id else CollapsedState.PENDING_COLLAPSED.id
Pair(state, previous.headId)
} else {
Pair(CollapsedState.HEAD_COLLAPSED.id, message.id)
}
db.update(MessageTable.TABLE_NAME)
.values(
MessageTable.COLLAPSED_STATE to collapsedState,
MessageTable.COLLAPSED_HEAD_ID to headId
)
.where("${MessageTable.ID} = ?", message.id)
.run()
lastMessageByThread[message.threadId] = LastMessage(collapsibleType, headId, message.dateReceived)
}
}
if (messages.isEmpty() || messages.size != BATCH_SIZE) {
Log.i(TAG, "Finished processing all messages, backfill is completed")
} else {
val dateReceived = messages.last().dateReceived
Log.i(TAG, "Processed ${messages.size} messages, up to time $dateReceived. Re-enqueuing job")
AppDependencies.jobManager.add(BackfillCollapsedMessageJob(lastDateReceived = dateReceived))
}
return Result.success()
}
override fun onFailure() {
Log.w(TAG, "Failed to backfill collapsed messages. Time of last processed message: $lastDateReceived")
}
/**
* Data required from a message to know if it collapsible
*/
private data class PotentialCollapsibleMessage(
val id: Long,
val threadId: Long,
val type: Long,
val dateReceived: Long,
val collapsedState: Long,
val read: Boolean,
val messageExtras: MessageExtras?
)
/**
* Information about the previous message, used when deciding the collapsible state of the next
*/
private data class LastMessage(
val collapsibleType: CollapsibleEvents.CollapsibleType?,
val headId: Long,
val dateReceived: Long
)
class Factory : Job.Factory<BackfillCollapsedMessageJob> {
override fun create(parameters: Parameters, serializedData: ByteArray?): BackfillCollapsedMessageJob {
val data = BackfillCollapsedMessageJobData.ADAPTER.decode(serializedData!!)
return BackfillCollapsedMessageJob(lastDateReceived = data.lastDateReceived, parameters = parameters)
}
}
}

View File

@@ -146,6 +146,7 @@ public final class JobManagerFactories {
put(AutomaticSessionResetJob.KEY, new AutomaticSessionResetJob.Factory());
put(AvatarGroupsV1DownloadJob.KEY, new AvatarGroupsV1DownloadJob.Factory());
put(AvatarGroupsV2DownloadJob.KEY, new AvatarGroupsV2DownloadJob.Factory());
put(BackfillCollapsedMessageJob.KEY, new BackfillCollapsedMessageJob.Factory());
put(BackfillDigestsForDataFileJob.KEY, new BackfillDigestsForDataFileJob.Factory());
put(BackupDeleteJob.KEY, new BackupDeleteJob.Factory());
put(BackupMessagesJob.KEY, new BackupMessagesJob.Factory());

View File

@@ -118,6 +118,10 @@ message RestoreLocalAttachmentJobData {
uint64 fileSize = 4;
}
message BackfillCollapsedMessageJobData {
int64 lastDateReceived = 1;
}
message BackfillDigestJobData {
uint64 attachmentId = 1;
}