Fix OOM in collapse backfill job.

This commit is contained in:
Michelle Tang
2026-04-06 12:15:35 -04:00
committed by GitHub
parent 78e1a407a6
commit 892e6bd853
4 changed files with 60 additions and 55 deletions
@@ -235,6 +235,7 @@ class InternalSettingsFragment : DSLSettingsFragment(R.string.preferences__inter
title = DSLSettingsText.from("Collapse chat updates"),
summary = DSLSettingsText.from("Collapses certain consecutive chat updates - cannot be undone."),
onClick = {
SignalStore.misc.completedCollapsedEventsMigration = false
AppDependencies.jobManager.add(BackfillCollapsedMessageJob())
}
)
@@ -1,7 +1,6 @@
package org.thoughtcrime.securesms.jobs
import org.signal.core.util.logging.Log
import org.signal.core.util.readToList
import org.signal.core.util.requireBlob
import org.signal.core.util.requireBoolean
import org.signal.core.util.requireLong
@@ -16,6 +15,7 @@ import org.thoughtcrime.securesms.dependencies.AppDependencies
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobs.BackfillCollapsedMessageJob.Companion.BATCH_SIZE
import org.thoughtcrime.securesms.jobs.protos.BackfillCollapsedMessageJobData
import org.thoughtcrime.securesms.keyvalue.SignalStore
import org.thoughtcrime.securesms.util.DateUtils
/**
@@ -51,61 +51,70 @@ class BackfillCollapsedMessageJob private constructor(
override fun getFactoryKey(): String = KEY
override fun run(): Result {
if (SignalStore.misc.completedCollapsedEventsMigration) {
Log.i(TAG, "Already completed migration")
return Result.success()
}
val db = SignalDatabase.rawDatabase
val messages = db
.select(MessageTable.ID, MessageTable.THREAD_ID, MessageTable.DATE_RECEIVED, MessageTable.TYPE, MessageTable.READ, MessageTable.COLLAPSED_STATE, MessageTable.MESSAGE_EXTRAS)
var messageCount = 0
var lastProcessedDateReceived = lastDateReceived
// Tracks the last/previous message to compare against the current message when determining collapsed state
val lastMessageByThread = mutableMapOf<Long, LastMessage?>()
db
.select(MessageTable.ID, MessageTable.THREAD_ID, MessageTable.DATE_RECEIVED, MessageTable.TYPE, MessageTable.READ, MessageTable.MESSAGE_EXTRAS)
.from(MessageTable.TABLE_NAME)
.where("${MessageTable.DATE_RECEIVED} > ?", lastDateReceived)
.orderBy("${MessageTable.DATE_RECEIVED}, ${MessageTable.ID}")
.limit(BATCH_SIZE)
.run()
.readToList { cursor ->
PotentialCollapsibleMessage(
id = cursor.requireLong(MessageTable.ID),
threadId = cursor.requireLong(MessageTable.THREAD_ID),
type = cursor.requireLong(MessageTable.TYPE),
dateReceived = cursor.requireLong(MessageTable.DATE_RECEIVED),
collapsedState = cursor.requireLong(MessageTable.COLLAPSED_STATE),
read = cursor.requireBoolean(MessageTable.READ),
messageExtras = cursor.requireBlob(MessageTable.MESSAGE_EXTRAS)?.let { MessageExtras.ADAPTER.decode(it) }
)
}
.use { cursor ->
while (cursor.moveToNext()) {
val id = cursor.requireLong(MessageTable.ID)
val threadId = cursor.requireLong(MessageTable.THREAD_ID)
val type = cursor.requireLong(MessageTable.TYPE)
val dateReceived = cursor.requireLong(MessageTable.DATE_RECEIVED)
val read = cursor.requireBoolean(MessageTable.READ)
val messageExtras = cursor.requireBlob(MessageTable.MESSAGE_EXTRAS)?.let { MessageExtras.ADAPTER.decode(it) }
// Tracks the last/previous message to compare against the current message when determining collapsed state
val lastMessageByThread = mutableMapOf<Long, LastMessage?>()
for (message in messages) {
val collapsibleType = CollapsibleEvents.getCollapsibleType(message.type, message.messageExtras)
val collapsibleType = CollapsibleEvents.getCollapsibleType(type, messageExtras)
if (collapsibleType == null) {
lastMessageByThread[message.threadId] = null
} else {
val previous = lastMessageByThread[message.threadId]
if (collapsibleType == null) {
lastMessageByThread[threadId] = null
} else {
val previous = lastMessageByThread[threadId]
val (collapsedState, headId, size) = if ((previous?.collapsibleType == collapsibleType) && DateUtils.isSameDay(previous.dateReceived, message.dateReceived) && previous.collapsedSetSize < CollapsibleEvents.MAX_SIZE) {
val state = if (message.read) CollapsedState.COLLAPSED.id else CollapsedState.PENDING_COLLAPSED.id
Triple(state, previous.headId, previous.collapsedSetSize)
} else {
Triple(CollapsedState.HEAD_COLLAPSED.id, message.id, 0)
val (collapsedState, headId, size) = if ((previous?.collapsibleType == collapsibleType) && DateUtils.isSameDay(previous.dateReceived, dateReceived) && previous.collapsedSetSize < CollapsibleEvents.MAX_SIZE) {
val state = if (read) CollapsedState.COLLAPSED.id else CollapsedState.PENDING_COLLAPSED.id
Triple(state, previous.headId, previous.collapsedSetSize)
} else {
Triple(CollapsedState.HEAD_COLLAPSED.id, id, 0)
}
db.update(MessageTable.TABLE_NAME)
.values(
MessageTable.COLLAPSED_STATE to collapsedState,
MessageTable.COLLAPSED_HEAD_ID to headId
)
.where("${MessageTable.ID} = ?", id)
.run()
lastMessageByThread[threadId] = LastMessage(collapsibleType, headId, dateReceived, size + 1)
}
messageCount++
lastProcessedDateReceived = dateReceived
}
db.update(MessageTable.TABLE_NAME)
.values(
MessageTable.COLLAPSED_STATE to collapsedState,
MessageTable.COLLAPSED_HEAD_ID to headId
)
.where("${MessageTable.ID} = ?", message.id)
.run()
lastMessageByThread[message.threadId] = LastMessage(collapsibleType, headId, message.dateReceived, size + 1)
}
}
if (messages.isEmpty() || messages.size != BATCH_SIZE) {
if (messageCount == 0 || messageCount != BATCH_SIZE) {
Log.i(TAG, "Finished processing all messages, backfill is completed")
SignalStore.misc.completedCollapsedEventsMigration = true
} else {
val dateReceived = messages.last().dateReceived
Log.i(TAG, "Processed ${messages.size} messages, up to time $dateReceived. Re-enqueuing job")
AppDependencies.jobManager.add(BackfillCollapsedMessageJob(lastDateReceived = dateReceived))
Log.i(TAG, "Processed $messageCount messages, up to time $lastProcessedDateReceived. Re-enqueuing job")
AppDependencies.jobManager.add(BackfillCollapsedMessageJob(lastDateReceived = lastProcessedDateReceived))
}
return Result.success()
@@ -115,19 +124,6 @@ class BackfillCollapsedMessageJob private constructor(
Log.w(TAG, "Failed to backfill collapsed messages. Time of last processed message: $lastDateReceived")
}
/**
* Data required from a message to know if it collapsible
*/
private data class PotentialCollapsibleMessage(
val id: Long,
val threadId: Long,
val type: Long,
val dateReceived: Long,
val collapsedState: Long,
val read: Boolean,
val messageExtras: MessageExtras?
)
/**
* Information about the previous message, used when deciding the collapsible state of the next
*/
@@ -47,6 +47,7 @@ class MiscellaneousValues internal constructor(store: KeyValueStore) : SignalSto
private const val HAS_KEY_TRANSPARENCY_FAILURE = "misc.has_key_transparency_failure"
private const val HAS_SEEN_KEY_TRANSPARENCY_FAILURE = "misc.has_seen_key_transparency_failure"
private const val CAMERA_FACING_FRONT = "misc.camera_facing_front"
private const val COMPLETED_COLLAPSED_EVENTS_MIGRATION = "misc.completed_collapsed_events_migration"
}
public override fun onFirstEverAppLaunch() {
@@ -315,4 +316,6 @@ class MiscellaneousValues internal constructor(store: KeyValueStore) : SignalSto
* Whether or not the preferred camera direction is front-facing.
*/
var isCameraFacingFront: Boolean by booleanValue(CAMERA_FACING_FRONT, true)
var completedCollapsedEventsMigration: Boolean by booleanValue(COMPLETED_COLLAPSED_EVENTS_MIGRATION, false)
}
@@ -197,9 +197,10 @@ public class ApplicationMigrations {
static final int RELEASE_CHANNEL_RECIPIENT_FIX = 153;
static final int EMOJI_VERSION_13 = 154;
static final int COLLAPSED_EVENTS = 155;
static final int COLLAPSED_EVENTS_2 = 156;
}
public static final int CURRENT_VERSION = 155;
public static final int CURRENT_VERSION = 156;
/**
* This *must* be called after the {@link JobManager} has been instantiated, but *before* the call
@@ -914,6 +915,10 @@ public class ApplicationMigrations {
jobs.put(Version.COLLAPSED_EVENTS, new BackfillCollapsedEventsMigrationJob());
}
if (lastSeenVersion < Version.COLLAPSED_EVENTS_2) {
jobs.put(Version.COLLAPSED_EVENTS_2, new BackfillCollapsedEventsMigrationJob());
}
return jobs;
}