Add sticker specific restore flow and fix archive related sticker bugs.

This commit is contained in:
Cody Henthorne
2025-08-27 09:39:12 -04:00
committed by Michelle Tang
parent 9903a664d4
commit 21363f085e
15 changed files with 738 additions and 70 deletions

View File

@@ -0,0 +1,78 @@
/*
* Copyright 2025 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.attachments
import android.net.Uri
import android.os.Parcel
import androidx.core.os.ParcelCompat
import org.thoughtcrime.securesms.database.AttachmentTable
import org.thoughtcrime.securesms.database.model.StickerRecord
import org.thoughtcrime.securesms.mms.StickerSlide
import org.thoughtcrime.securesms.stickers.StickerLocator
import java.security.SecureRandom
/**
* An incoming sticker that is already available locally via an installed sticker pack.
*/
class LocalStickerAttachment : Attachment {
constructor(
stickerRecord: StickerRecord,
stickerLocator: StickerLocator
) : super(
contentType = stickerRecord.contentType,
transferState = AttachmentTable.TRANSFER_PROGRESS_DONE,
size = stickerRecord.size,
fileName = null,
cdn = Cdn.CDN_0,
remoteLocation = null,
remoteKey = null,
remoteDigest = null,
incrementalDigest = null,
fastPreflightId = SecureRandom().nextLong().toString(),
voiceNote = false,
borderless = false,
videoGif = false,
width = StickerSlide.WIDTH,
height = StickerSlide.HEIGHT,
incrementalMacChunkSize = 0,
quote = false,
uploadTimestamp = 0,
caption = null,
stickerLocator = stickerLocator,
blurHash = null,
audioHash = null,
transformProperties = null,
uuid = null
) {
uri = stickerRecord.uri
}
@Suppress("unused")
constructor(parcel: Parcel) : super(parcel) {
uri = ParcelCompat.readParcelable(parcel, Uri::class.java.classLoader, Uri::class.java)!!
}
override val uri: Uri
override val publicUri: Uri? = null
override val thumbnailUri: Uri? = null
val packId: String = stickerLocator!!.packId
val stickerId: Int = stickerLocator!!.stickerId
override fun writeToParcel(dest: Parcel, flags: Int) {
super.writeToParcel(dest, flags)
dest.writeParcelable(uri, 0)
}
override fun equals(other: Any?): Boolean {
return other != null && other is LocalStickerAttachment && other.uri == uri
}
override fun hashCode(): Int {
return uri.hashCode()
}
}

View File

@@ -91,6 +91,7 @@ import org.thoughtcrime.securesms.database.OneTimePreKeyTable
import org.thoughtcrime.securesms.database.SearchTable
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.database.SignedPreKeyTable
import org.thoughtcrime.securesms.database.StickerTable
import org.thoughtcrime.securesms.database.ThreadTable
import org.thoughtcrime.securesms.database.model.InAppPaymentSubscriberRecord
import org.thoughtcrime.securesms.dependencies.AppDependencies
@@ -110,6 +111,7 @@ import org.thoughtcrime.securesms.jobs.RequestGroupV2InfoJob
import org.thoughtcrime.securesms.jobs.RestoreAttachmentJob
import org.thoughtcrime.securesms.jobs.RestoreOptimizedMediaJob
import org.thoughtcrime.securesms.jobs.RetrieveProfileJob
import org.thoughtcrime.securesms.jobs.StickerPackDownloadJob
import org.thoughtcrime.securesms.keyvalue.BackupValues.ArchiveServiceCredentials
import org.thoughtcrime.securesms.keyvalue.KeyValueStore
import org.thoughtcrime.securesms.keyvalue.SignalStore
@@ -1374,6 +1376,17 @@ object BackupRepository {
AppDependencies.recipientCache.warmUp()
SignalDatabase.threads.clearCache()
val stickerJobs = SignalDatabase.stickers.getAllStickerPacks().use { cursor ->
val reader = StickerTable.StickerPackRecordReader(cursor)
reader
.filter { it.isInstalled }
.map {
StickerPackDownloadJob.forInstall(it.packId, it.packKey, false)
}
}
AppDependencies.jobManager.addAll(stickerJobs)
stopwatch.split("sticker-jobs")
val recipientIds = SignalDatabase.threads.getRecentConversationList(
limit = RECENT_RECIPIENTS_MAX,
includeInactiveGroups = false,
@@ -1391,6 +1404,7 @@ object BackupRepository {
}
RetrieveProfileJob.enqueue(recipientIds, skipDebounce = false)
stopwatch.split("profile-jobs")
AppDependencies.jobManager.add(CreateReleaseChannelJob.create())

View File

@@ -18,8 +18,6 @@ import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.database.StickerTable
import org.thoughtcrime.securesms.database.StickerTable.StickerPackRecordReader
import org.thoughtcrime.securesms.database.model.StickerPackRecord
import org.thoughtcrime.securesms.dependencies.AppDependencies
import org.thoughtcrime.securesms.jobs.StickerPackDownloadJob
import java.io.IOException
private val TAG = Log.tag(StickerArchiveProcessor::class)
@@ -55,14 +53,6 @@ object StickerArchiveProcessor {
StickerTable.FILE_PATH to ""
)
.run(SQLiteDatabase.CONFLICT_IGNORE)
AppDependencies.jobManager.add(
StickerPackDownloadJob.forInstall(
Hex.toStringCondensed(stickerPack.packId.toByteArray()),
Hex.toStringCondensed(stickerPack.packKey.toByteArray()),
false
)
)
}
}

View File

@@ -67,6 +67,7 @@ import org.thoughtcrime.securesms.attachments.Attachment
import org.thoughtcrime.securesms.attachments.AttachmentId
import org.thoughtcrime.securesms.attachments.Cdn
import org.thoughtcrime.securesms.attachments.DatabaseAttachment
import org.thoughtcrime.securesms.attachments.LocalStickerAttachment
import org.thoughtcrime.securesms.attachments.WallpaperAttachment
import org.thoughtcrime.securesms.audio.AudioHash
import org.thoughtcrime.securesms.backup.v2.exporters.ChatItemArchiveExporter
@@ -532,7 +533,7 @@ class AttachmentTable(
fun getLast30DaysOfRestorableAttachments(batchSize: Int): List<RestorableAttachment> {
val thirtyDaysAgo = System.currentTimeMillis().milliseconds - 30.days
return readableDatabase
.select("$TABLE_NAME.$ID", MESSAGE_ID, DATA_SIZE, DATA_HASH_END, REMOTE_KEY)
.select("$TABLE_NAME.$ID", MESSAGE_ID, DATA_SIZE, DATA_HASH_END, REMOTE_KEY, STICKER_PACK_ID)
.from("$TABLE_NAME INNER JOIN ${MessageTable.TABLE_NAME} ON ${MessageTable.TABLE_NAME}.${MessageTable.ID} = $TABLE_NAME.$MESSAGE_ID")
.where("$TRANSFER_STATE = ? AND ${MessageTable.TABLE_NAME}.${MessageTable.DATE_RECEIVED} >= ?", TRANSFER_NEEDS_RESTORE, thirtyDaysAgo.inWholeMilliseconds)
.limit(batchSize)
@@ -544,7 +545,8 @@ class AttachmentTable(
mmsId = it.requireLong(MESSAGE_ID),
size = it.requireLong(DATA_SIZE),
plaintextHash = it.requireString(DATA_HASH_END)?.let { hash -> Base64.decode(hash) },
remoteKey = it.requireString(REMOTE_KEY)?.let { key -> Base64.decode(key) }
remoteKey = it.requireString(REMOTE_KEY)?.let { key -> Base64.decode(key) },
stickerPackId = it.requireString(STICKER_PACK_ID)
)
}
}
@@ -556,7 +558,7 @@ class AttachmentTable(
fun getOlderRestorableAttachments(batchSize: Int): List<RestorableAttachment> {
val thirtyDaysAgo = System.currentTimeMillis().milliseconds - 30.days
return readableDatabase
.select("$TABLE_NAME.$ID", MESSAGE_ID, DATA_SIZE, DATA_HASH_END, REMOTE_KEY)
.select("$TABLE_NAME.$ID", MESSAGE_ID, DATA_SIZE, DATA_HASH_END, REMOTE_KEY, STICKER_PACK_ID)
.from("$TABLE_NAME INNER JOIN ${MessageTable.TABLE_NAME} ON ${MessageTable.TABLE_NAME}.${MessageTable.ID} = $TABLE_NAME.$MESSAGE_ID")
.where("$TRANSFER_STATE = ? AND ${MessageTable.TABLE_NAME}.${MessageTable.DATE_RECEIVED} < ?", TRANSFER_NEEDS_RESTORE, thirtyDaysAgo.inWholeMilliseconds)
.limit(batchSize)
@@ -568,14 +570,15 @@ class AttachmentTable(
mmsId = it.requireLong(MESSAGE_ID),
size = it.requireLong(DATA_SIZE),
plaintextHash = it.requireString(DATA_HASH_END)?.let { hash -> Base64.decode(hash) },
remoteKey = it.requireString(REMOTE_KEY)?.let { key -> Base64.decode(key) }
remoteKey = it.requireString(REMOTE_KEY)?.let { key -> Base64.decode(key) },
stickerPackId = it.requireString(STICKER_PACK_ID)
)
}
}
fun getRestorableAttachments(batchSize: Int): List<RestorableAttachment> {
return readableDatabase
.select(ID, MESSAGE_ID, DATA_SIZE, DATA_HASH_END, REMOTE_KEY)
.select(ID, MESSAGE_ID, DATA_SIZE, DATA_HASH_END, REMOTE_KEY, STICKER_PACK_ID)
.from(TABLE_NAME)
.where("$TRANSFER_STATE = ?", TRANSFER_NEEDS_RESTORE)
.limit(batchSize)
@@ -587,14 +590,15 @@ class AttachmentTable(
mmsId = it.requireLong(MESSAGE_ID),
size = it.requireLong(DATA_SIZE),
plaintextHash = it.requireString(DATA_HASH_END)?.let { hash -> Base64.decode(hash) },
remoteKey = it.requireString(REMOTE_KEY)?.let { key -> Base64.decode(key) }
remoteKey = it.requireString(REMOTE_KEY)?.let { key -> Base64.decode(key) },
stickerPackId = it.requireString(STICKER_PACK_ID)
)
}
}
fun getRestorableOptimizedAttachments(): List<RestorableAttachment> {
return readableDatabase
.select(ID, MESSAGE_ID, DATA_SIZE, DATA_HASH_END, REMOTE_KEY)
.select(ID, MESSAGE_ID, DATA_SIZE, DATA_HASH_END, REMOTE_KEY, STICKER_PACK_ID)
.from(TABLE_NAME)
.where("$TRANSFER_STATE = ? AND $DATA_HASH_END NOT NULL AND $REMOTE_KEY NOT NULL", TRANSFER_RESTORE_OFFLOADED)
.orderBy("$ID DESC")
@@ -605,7 +609,8 @@ class AttachmentTable(
mmsId = it.requireLong(MESSAGE_ID),
size = it.requireLong(DATA_SIZE),
plaintextHash = it.requireString(DATA_HASH_END)?.let { hash -> Base64.decode(hash) },
remoteKey = it.requireString(REMOTE_KEY)?.let { key -> Base64.decode(key) }
remoteKey = it.requireString(REMOTE_KEY)?.let { key -> Base64.decode(key) },
stickerPackId = it.requireString(STICKER_PACK_ID)
)
}
}
@@ -1760,6 +1765,7 @@ class AttachmentTable(
val insertedAttachments: MutableMap<Attachment, AttachmentId> = mutableMapOf()
for (attachment in attachments) {
val attachmentId = when {
attachment is LocalStickerAttachment -> insertLocalStickerAttachment(mmsId, attachment)
attachment.uri != null -> insertAttachmentWithData(mmsId, attachment, attachment.quote)
attachment is ArchivedAttachment -> insertArchivedAttachment(mmsId, attachment, attachment.quote)
else -> insertUndownloadedAttachment(mmsId, attachment, attachment.quote)
@@ -2460,8 +2466,103 @@ class AttachmentTable(
}
/**
* Inserts an attachment with existing data. This is likely an outgoing attachment that we're in the process of sending or
* an incoming sticker we have already downloaded.
* Inserts an incoming sticker with pre-existing local data (i.e., the sticker pack is installed).
*/
@Throws(MmsException::class)
private fun insertLocalStickerAttachment(messageId: Long, stickerAttachment: LocalStickerAttachment): AttachmentId {
Log.d(TAG, "[insertLocalStickerAttachment] Inserting attachment for messageId $messageId. (MessageId: $messageId, ${stickerAttachment.uri})")
// find sticker record and reuse
var attachmentId: AttachmentId? = null
writableDatabase.withinTransaction { db ->
val match = db.select()
.from(TABLE_NAME)
.where("$DATA_FILE NOT NULL AND $DATA_RANDOM NOT NULL AND $STICKER_PACK_ID = ? AND $STICKER_ID = ?", stickerAttachment.packId, stickerAttachment.stickerId)
.run()
.readToSingleObject {
it.readAttachment() to it.readDataFileInfo()!!
}
if (match != null) {
val (attachment, dataFileInfo) = match
Log.i(TAG, "[insertLocalStickerAttachment] Found that the sticker matches an existing sticker attachment: ${attachment.attachmentId}. Using all of it's fields. (MessageId: $messageId, ${attachment.uri})")
val contentValues = ContentValues().apply {
put(MESSAGE_ID, messageId)
put(CONTENT_TYPE, attachment.contentType)
put(REMOTE_KEY, attachment.remoteKey)
put(REMOTE_LOCATION, attachment.remoteLocation)
put(REMOTE_DIGEST, attachment.remoteDigest)
put(CDN_NUMBER, attachment.cdn.serialize())
put(TRANSFER_STATE, attachment.transferState)
put(DATA_FILE, dataFileInfo.file.absolutePath)
put(DATA_SIZE, attachment.size)
put(DATA_RANDOM, dataFileInfo.random)
put(FAST_PREFLIGHT_ID, stickerAttachment.fastPreflightId)
put(WIDTH, attachment.width)
put(HEIGHT, attachment.height)
put(STICKER_PACK_ID, attachment.stickerLocator!!.packId)
put(STICKER_PACK_KEY, attachment.stickerLocator.packKey)
put(STICKER_ID, attachment.stickerLocator.stickerId)
put(STICKER_EMOJI, attachment.stickerLocator.emoji)
put(BLUR_HASH, attachment.blurHash?.hash)
put(UPLOAD_TIMESTAMP, attachment.uploadTimestamp)
put(DATA_HASH_START, dataFileInfo.hashStart)
put(DATA_HASH_END, dataFileInfo.hashEnd ?: dataFileInfo.hashStart)
put(ARCHIVE_CDN, attachment.archiveCdn)
put(ARCHIVE_TRANSFER_STATE, attachment.archiveTransferState.value)
put(THUMBNAIL_RESTORE_STATE, dataFileInfo.thumbnailRestoreState)
put(THUMBNAIL_RANDOM, dataFileInfo.thumbnailRandom)
put(THUMBNAIL_FILE, dataFileInfo.thumbnailFile)
put(ATTACHMENT_UUID, stickerAttachment.uuid?.toString())
}
val rowId = db.insert(TABLE_NAME, null, contentValues)
attachmentId = AttachmentId(rowId)
}
}
if (attachmentId == null) {
val dataStream = try {
PartAuthority.getAttachmentStream(context, stickerAttachment.uri)
} catch (e: IOException) {
throw MmsException(e)
}
val fileWriteResult: DataFileWriteResult = writeToDataFile(newDataFile(context), dataStream, stickerAttachment.transformProperties ?: TransformProperties.empty())
Log.d(TAG, "[insertLocalStickerAttachment] Wrote data to file: ${fileWriteResult.file.absolutePath} (MessageId: $messageId, ${stickerAttachment.uri})")
val remoteKey = Util.getSecretBytes(64)
val contentValues = ContentValues().apply {
put(MESSAGE_ID, messageId)
put(CONTENT_TYPE, stickerAttachment.contentType)
put(REMOTE_KEY, Base64.encodeWithPadding(remoteKey))
put(TRANSFER_STATE, stickerAttachment.transferState)
put(DATA_FILE, fileWriteResult.file.absolutePath)
put(DATA_SIZE, fileWriteResult.length)
put(DATA_RANDOM, fileWriteResult.random)
put(FAST_PREFLIGHT_ID, stickerAttachment.fastPreflightId)
put(WIDTH, stickerAttachment.width)
put(HEIGHT, stickerAttachment.height)
put(STICKER_PACK_ID, stickerAttachment.stickerLocator!!.packId)
put(STICKER_PACK_KEY, stickerAttachment.stickerLocator.packKey)
put(STICKER_ID, stickerAttachment.stickerLocator.stickerId)
put(STICKER_EMOJI, stickerAttachment.stickerLocator.emoji)
put(DATA_HASH_START, fileWriteResult.hash)
put(DATA_HASH_END, fileWriteResult.hash)
put(ATTACHMENT_UUID, stickerAttachment.uuid?.toString())
}
val rowId = writableDatabase.insert(TABLE_NAME, null, contentValues)
attachmentId = AttachmentId(rowId)
}
return attachmentId
}
/**
* Inserts an attachment with existing data. This is likely an outgoing attachment that we're in the process of sending.
*/
@Throws(MmsException::class)
private fun insertAttachmentWithData(messageId: Long, attachment: Attachment, quote: Boolean): AttachmentId {
@@ -3138,7 +3239,8 @@ class AttachmentTable(
val mmsId: Long,
val size: Long,
val plaintextHash: ByteArray?,
val remoteKey: ByteArray?
val remoteKey: ByteArray?,
val stickerPackId: String?
) {
override fun equals(other: Any?): Boolean {
return this === other || attachmentId == (other as? RestorableAttachment)?.attachmentId

View File

@@ -2,6 +2,7 @@ package org.thoughtcrime.securesms.database
import android.content.Context
import android.database.Cursor
import androidx.core.content.contentValuesOf
import org.greenrobot.eventbus.EventBus
import org.signal.core.util.StreamUtil
import org.signal.core.util.delete
@@ -98,23 +99,37 @@ class StickerTable(
fun insertSticker(sticker: IncomingSticker, dataStream: InputStream, notify: Boolean) {
val fileInfo: FileInfo = saveStickerImage(dataStream)
writableDatabase
.insertInto(TABLE_NAME)
.values(
PACK_ID to sticker.packId,
PACK_KEY to sticker.packKey,
PACK_TITLE to sticker.packTitle,
PACK_AUTHOR to sticker.packAuthor,
STICKER_ID to sticker.stickerId,
EMOJI to sticker.emoji,
CONTENT_TYPE to sticker.contentType,
COVER to if (sticker.isCover) 1 else 0,
INSTALLED to if (sticker.isInstalled) 1 else 0,
FILE_PATH to fileInfo.file.absolutePath,
FILE_LENGTH to fileInfo.length,
FILE_RANDOM to fileInfo.random
)
.run(SQLiteDatabase.CONFLICT_REPLACE)
val values = contentValuesOf(
PACK_ID to sticker.packId,
PACK_KEY to sticker.packKey,
PACK_TITLE to sticker.packTitle,
PACK_AUTHOR to sticker.packAuthor,
STICKER_ID to sticker.stickerId,
EMOJI to sticker.emoji,
CONTENT_TYPE to sticker.contentType,
COVER to if (sticker.isCover) 1 else 0,
INSTALLED to if (sticker.isInstalled) 1 else 0,
FILE_PATH to fileInfo.file.absolutePath,
FILE_LENGTH to fileInfo.length,
FILE_RANDOM to fileInfo.random
)
var updated = false
if (sticker.isCover) {
// Archive restore inserts cover rows without a sticker id, try to update first on a reduced uniqueness constraint
updated = writableDatabase
.update(TABLE_NAME)
.values(values)
.where("$PACK_ID = ? AND $COVER = 1", sticker.packId)
.run() > 0
}
if (!updated) {
writableDatabase
.insertInto(TABLE_NAME)
.values(values)
.run(SQLiteDatabase.CONFLICT_REPLACE)
}
notifyStickerListeners()
@@ -454,7 +469,7 @@ class StickerTable(
}
}
class StickerPackRecordReader(private val cursor: Cursor) : Closeable {
class StickerPackRecordReader(private val cursor: Cursor) : Closeable, Iterable<StickerPackRecord> {
fun getNext(): StickerPackRecord? {
if (!cursor.moveToNext()) {
@@ -486,5 +501,19 @@ class StickerTable(
override fun close() {
cursor.close()
}
override fun iterator(): Iterator<StickerPackRecord> {
return ReaderIterator()
}
private inner class ReaderIterator : Iterator<StickerPackRecord> {
override fun hasNext(): Boolean {
return cursor.count != 0 && !cursor.isLast
}
override fun next(): StickerPackRecord {
return getNext() ?: throw NoSuchElementException()
}
}
}
}

View File

@@ -469,6 +469,10 @@ class JobController {
return jobStorage.areQueuesEmpty(queueKeys);
}
synchronized boolean areFactoriesEmpty(@NonNull Set<String> factoryKeys) {
return jobStorage.areFactoriesEmpty(factoryKeys);
}
/**
* Initializes the dynamic JobRunner system with minimum threads.
*/

View File

@@ -436,6 +436,19 @@ public class JobManager implements ConstraintObserver.Notifier {
return jobController.areQueuesEmpty(queueKeys);
}
/**
* Can tell you if there are no jobs for the given factories at the time of invocation. It is worth noting
* that the state could change immediately after this method returns due to a call on some
* other thread, and you should take that into consideration when using the result.
*
* @return True if there are no jobs for the given factories at the time of invocation, otherwise false.
*/
@WorkerThread
public boolean areFactoriesEmpty(@NonNull Set<String> factoryKeys) {
waitUntilInitialized();
return jobController.areFactoriesEmpty(factoryKeys);
}
/**
* Pokes the system to take another pass at the job queue.
*/

View File

@@ -0,0 +1,49 @@
/*
* Copyright 2025 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.jobmanager.impl
import android.app.job.JobInfo
import org.thoughtcrime.securesms.dependencies.AppDependencies
import org.thoughtcrime.securesms.jobmanager.Constraint
import org.thoughtcrime.securesms.jobmanager.ConstraintObserver
import org.thoughtcrime.securesms.jobs.StickerDownloadJob
import org.thoughtcrime.securesms.jobs.StickerPackDownloadJob
/**
* When met, no sticker download jobs should be in the job queue/running.
*/
object StickersNotDownloadingConstraint : Constraint {
const val KEY = "StickersNotDownloadingConstraint"
private val factoryKeys = setOf(StickerPackDownloadJob.KEY, StickerDownloadJob.KEY)
override fun isMet(): Boolean {
return AppDependencies.jobManager.areFactoriesEmpty(factoryKeys)
}
override fun getFactoryKey(): String = KEY
override fun applyToJobInfo(jobInfoBuilder: JobInfo.Builder) = Unit
object Observer : ConstraintObserver {
override fun register(notifier: ConstraintObserver.Notifier) {
AppDependencies.jobManager.addListener({ job -> factoryKeys.contains(job.factoryKey) }) { job, jobState ->
if (jobState.isComplete) {
if (isMet) {
notifier.onConstraintMet(KEY)
}
}
}
}
}
class Factory : Constraint.Factory<StickersNotDownloadingConstraint> {
override fun create(): StickersNotDownloadingConstraint {
return StickersNotDownloadingConstraint
}
}
}

View File

@@ -35,6 +35,9 @@ interface JobStorage {
@WorkerThread
fun areQueuesEmpty(queueKeys: Set<String>): Boolean
@WorkerThread
fun areFactoriesEmpty(factoryKeys: Set<String>): Boolean
@WorkerThread
fun markJobAsRunning(id: String, currentTime: Long)

View File

@@ -90,7 +90,8 @@ class BackupRestoreMediaJob private constructor(parameters: Parameters) : BaseJo
if (isWallpaper || shouldRestoreFullSize(message!!, restoreTime, SignalStore.backup.optimizeStorage)) {
restoreFullAttachmentJobs += RestoreAttachmentJob.forInitialRestore(
messageId = attachment.mmsId,
attachmentId = attachment.attachmentId
attachmentId = attachment.attachmentId,
stickerPackId = attachment.stickerPackId
)
} else {
restoreThumbnailJobs += RestoreAttachmentThumbnailJob(

View File

@@ -12,6 +12,7 @@ import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec
import org.thoughtcrime.securesms.jobmanager.persistence.JobStorage
import org.thoughtcrime.securesms.util.LRUCache
import java.util.TreeSet
import java.util.concurrent.atomic.AtomicInteger
import java.util.function.Predicate
class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
@@ -50,6 +51,9 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
/** We need a fast way to know what the "most eligible job" is for a given queue. This serves as a lookup table that speeds up the maintenance of [eligibleJobs]. */
private val mostEligibleJobForQueue: MutableMap<String, MinimalJobSpec> = hashMapOf()
/** Quick lookup of job counts per factory for all jobs */
private val factoryCountIndex: MutableMap<String, AtomicInteger> = hashMapOf()
@Synchronized
override fun init() {
val stopwatch = Stopwatch("init", decimalPlaces = 2)
@@ -62,6 +66,7 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
} else {
placeJobInEligibleList(job)
}
factoryCountIndex.getOrPut(job.factoryKey) { AtomicInteger(0) }.incrementAndGet()
}
stopwatch.split("sort-min-jobs")
@@ -105,6 +110,7 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
} else {
placeJobInEligibleList(minimalJobSpec)
}
factoryCountIndex.getOrPut(minimalJobSpec.factoryKey) { AtomicInteger(0) }.incrementAndGet()
constraintsByJobId[fullSpec.jobSpec.id] = fullSpec.constraintSpecs.toMutableList()
dependenciesByJobId[fullSpec.jobSpec.id] = fullSpec.dependencySpecs.toMutableList()
@@ -178,9 +184,7 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
@Synchronized
override fun getJobCountForFactory(factoryKey: String): Int {
return minimalJobs
.filter { it.factoryKey == factoryKey }
.size
return factoryCountIndex[factoryKey]?.get() ?: 0
}
@Synchronized
@@ -195,6 +199,11 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
return minimalJobs.none { it.queueKey != null && queueKeys.contains(it.queueKey) }
}
@Synchronized
override fun areFactoriesEmpty(factoryKeys: Set<String>): Boolean {
return factoryKeys.all { (factoryCountIndex[it]?.get() ?: 0) == 0 }
}
@Synchronized
override fun markJobAsRunning(id: String, currentTime: Long) {
val job: JobSpec? = getJobSpec(id)
@@ -301,6 +310,13 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
if (updatedJob != null) {
iterator.set(updatedJob.toMinimalJobSpec())
replaceJobInEligibleList(current, updatedJob.toMinimalJobSpec())
if (current.factoryKey != updatedJob.factoryKey) {
if (factoryCountIndex[current.factoryKey]?.decrementAndGet() == 0) {
factoryCountIndex.remove(current.factoryKey)
}
factoryCountIndex.getOrPut(updatedJob.factoryKey) { AtomicInteger(0) }.incrementAndGet()
}
}
}
}
@@ -357,6 +373,12 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
}
}
}
for (job in jobsToDelete) {
if (factoryCountIndex[job.factoryKey]?.decrementAndGet() == 0) {
factoryCountIndex.remove(job.factoryKey)
}
}
}
@Synchronized
@@ -426,6 +448,13 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
iterator.set(updated)
replaceJobInEligibleList(current, updated)
if (current.factoryKey != updated.factoryKey) {
if (factoryCountIndex[current.factoryKey]?.decrementAndGet() == 0) {
factoryCountIndex.remove(current.factoryKey)
}
factoryCountIndex.getOrPut(updated.factoryKey) { AtomicInteger(0) }.incrementAndGet()
}
jobSpecCache.remove(current.id)?.let { currentJobSpec ->
val updatedJobSpec = currentJobSpec.copy(
id = updated.id,

View File

@@ -34,6 +34,7 @@ import org.thoughtcrime.securesms.jobmanager.impl.RestoreAttachmentConstraint;
import org.thoughtcrime.securesms.jobmanager.impl.RestoreAttachmentConstraintObserver;
import org.thoughtcrime.securesms.jobmanager.impl.SqlCipherMigrationConstraint;
import org.thoughtcrime.securesms.jobmanager.impl.SqlCipherMigrationConstraintObserver;
import org.thoughtcrime.securesms.jobmanager.impl.StickersNotDownloadingConstraint;
import org.thoughtcrime.securesms.jobmanager.impl.WifiConstraint;
import org.thoughtcrime.securesms.jobmanager.migrations.DeprecatedJobMigration;
import org.thoughtcrime.securesms.jobmanager.migrations.DonationReceiptRedemptionJobMigration;
@@ -427,6 +428,7 @@ public final class JobManagerFactories {
put(RegisteredConstraint.KEY, new RegisteredConstraint.Factory());
put(RestoreAttachmentConstraint.KEY, new RestoreAttachmentConstraint.Factory(application));
put(SqlCipherMigrationConstraint.KEY, new SqlCipherMigrationConstraint.Factory(application));
put(StickersNotDownloadingConstraint.KEY, new StickersNotDownloadingConstraint.Factory());
put(WifiConstraint.KEY, new WifiConstraint.Factory(application));
}};
}
@@ -444,7 +446,8 @@ public final class JobManagerFactories {
NoRemoteArchiveGarbageCollectionPendingConstraint.Observer.INSTANCE,
RegisteredConstraint.Observer.INSTANCE,
BackupMessagesConstraintObserver.INSTANCE,
DeletionNotAwaitingMediaDownloadConstraint.Observer.INSTANCE);
DeletionNotAwaitingMediaDownloadConstraint.Observer.INSTANCE,
StickersNotDownloadingConstraint.Observer.INSTANCE);
}
public static List<JobMigration> getJobMigrations(@NonNull Application application) {

View File

@@ -34,13 +34,16 @@ import org.thoughtcrime.securesms.jobmanager.impl.BackoffUtil
import org.thoughtcrime.securesms.jobmanager.impl.BatteryNotLowConstraint
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint
import org.thoughtcrime.securesms.jobmanager.impl.RestoreAttachmentConstraint
import org.thoughtcrime.securesms.jobmanager.impl.StickersNotDownloadingConstraint
import org.thoughtcrime.securesms.jobs.protos.RestoreAttachmentJobData
import org.thoughtcrime.securesms.keyvalue.SignalStore
import org.thoughtcrime.securesms.logsubmit.SubmitDebugLogActivity
import org.thoughtcrime.securesms.mms.MmsException
import org.thoughtcrime.securesms.mms.PartAuthority
import org.thoughtcrime.securesms.notifications.NotificationChannels
import org.thoughtcrime.securesms.notifications.NotificationIds
import org.thoughtcrime.securesms.service.BackupMediaRestoreService
import org.thoughtcrime.securesms.stickers.StickerLocator
import org.thoughtcrime.securesms.transport.RetryLaterException
import org.thoughtcrime.securesms.util.RemoteConfig
import org.thoughtcrime.securesms.util.SignalLocalMetrics
@@ -110,13 +113,14 @@ class RestoreAttachmentJob private constructor(
* Create a restore job for the initial large batch of media on a fresh restore.
* Will enqueue with some amount of parallization with low job priority.
*/
fun forInitialRestore(attachmentId: AttachmentId, messageId: Long): RestoreAttachmentJob {
fun forInitialRestore(attachmentId: AttachmentId, messageId: Long, stickerPackId: String?): RestoreAttachmentJob {
return RestoreAttachmentJob(
attachmentId = attachmentId,
messageId = messageId,
manual = false,
queue = Queues.INITIAL_RESTORE.random(),
priority = Parameters.PRIORITY_LOW
priority = Parameters.PRIORITY_LOW,
stickerPackId = stickerPackId
)
}
@@ -155,7 +159,7 @@ class RestoreAttachmentJob private constructor(
}
}
private constructor(messageId: Long, attachmentId: AttachmentId, manual: Boolean, queue: String, priority: Int) : this(
private constructor(messageId: Long, attachmentId: AttachmentId, manual: Boolean, queue: String, priority: Int, stickerPackId: String? = null) : this(
Parameters.Builder()
.setQueue(queue)
.apply {
@@ -165,6 +169,10 @@ class RestoreAttachmentJob private constructor(
addConstraint(RestoreAttachmentConstraint.KEY)
addConstraint(BatteryNotLowConstraint.KEY)
}
if (stickerPackId != null && SignalDatabase.stickers.isPackInstalled(stickerPackId)) {
addConstraint(StickersNotDownloadingConstraint.KEY)
}
}
.setLifespan(TimeUnit.DAYS.toMillis(30))
.setMaxAttempts(Parameters.UNLIMITED)
@@ -225,6 +233,28 @@ class RestoreAttachmentJob private constructor(
return
}
if (attachment.stickerLocator.isValid()) {
val locator = attachment.stickerLocator!!
val stickerRecord = SignalDatabase.stickers.getSticker(locator.packId, locator.stickerId, false)
if (stickerRecord != null) {
val dataStream = try {
PartAuthority.getAttachmentStream(context, stickerRecord.uri)
} catch (e: IOException) {
Log.w(TAG, "[$attachmentId] Attachment is sticker but no sticker available", e)
null
}
dataStream?.use { input ->
Log.i(TAG, "[$attachmentId] Attachment is sticker, restoring from local storage")
SignalDatabase.attachments.finalizeAttachmentAfterDownload(messageId, attachmentId, input, if (manual) System.currentTimeMillis().milliseconds else null)
return
}
}
Log.i(TAG, "[$attachmentId] Attachment is sticker, but unable to restore from local storage. Attempting to download.")
}
SignalLocalMetrics.ArchiveAttachmentRestore.start(attachmentId)
val progressServiceController = BackupMediaRestoreService.start(context, context.getString(R.string.BackupStatus__restoring_media))
@@ -372,7 +402,7 @@ class RestoreAttachmentJob private constructor(
throw IOException("Failed to delete temp download file following range exception")
}
} catch (e: InvalidAttachmentException) {
Log.w(TAG, e.message)
Log.w(TAG, "[$attachmentId] Invalid attachment: ${e.message}")
markFailed(attachmentId)
} catch (e: NonSuccessfulResponseCodeException) {
when (e.code) {
@@ -477,3 +507,10 @@ class RestoreAttachmentJob private constructor(
}
}
}
private fun StickerLocator?.isValid(): Boolean {
return this != null &&
this.packId.isNotNullOrBlank() &&
this.packKey.isNotNullOrBlank() &&
this.stickerId >= 0
}

View File

@@ -14,9 +14,9 @@ import org.signal.core.util.toOptional
import org.signal.libsignal.zkgroup.groups.GroupSecretParams
import org.signal.libsignal.zkgroup.receipts.ReceiptCredentialPresentation
import org.thoughtcrime.securesms.attachments.Attachment
import org.thoughtcrime.securesms.attachments.LocalStickerAttachment
import org.thoughtcrime.securesms.attachments.PointerAttachment
import org.thoughtcrime.securesms.attachments.TombstoneAttachment
import org.thoughtcrime.securesms.attachments.UriAttachment
import org.thoughtcrime.securesms.calls.links.CallLinks
import org.thoughtcrime.securesms.components.emoji.EmojiUtil
import org.thoughtcrime.securesms.contactshare.Contact
@@ -82,7 +82,6 @@ import org.thoughtcrime.securesms.messages.SignalServiceProtoUtil.toPointersWith
import org.thoughtcrime.securesms.mms.IncomingMessage
import org.thoughtcrime.securesms.mms.MmsException
import org.thoughtcrime.securesms.mms.QuoteModel
import org.thoughtcrime.securesms.mms.StickerSlide
import org.thoughtcrime.securesms.notifications.v2.ConversationId
import org.thoughtcrime.securesms.recipients.Recipient
import org.thoughtcrime.securesms.recipients.Recipient.HiddenState
@@ -108,7 +107,6 @@ import org.whispersystems.signalservice.internal.push.DataMessage
import org.whispersystems.signalservice.internal.push.Envelope
import org.whispersystems.signalservice.internal.push.GroupContextV2
import org.whispersystems.signalservice.internal.push.Preview
import java.security.SecureRandom
import java.util.Optional
import java.util.UUID
import kotlin.time.Duration
@@ -1211,25 +1209,7 @@ object DataMessageProcessor {
val stickerRecord: StickerRecord? = SignalDatabase.stickers.getSticker(stickerLocator.packId, stickerLocator.stickerId, false)
return if (stickerRecord != null) {
UriAttachment(
stickerRecord.uri,
stickerRecord.contentType,
AttachmentTable.TRANSFER_PROGRESS_DONE,
stickerRecord.size,
StickerSlide.WIDTH,
StickerSlide.HEIGHT,
null,
SecureRandom().nextLong().toString(),
false,
false,
false,
false,
null,
stickerLocator,
null,
null,
null
)
LocalStickerAttachment(stickerRecord, stickerLocator)
} else {
sticker.data_!!.toPointer(stickerLocator)
}

View File

@@ -977,6 +977,338 @@ class FastJobStorageTest {
assertThat(subject.areQueuesEmpty(TestHelpers.setOf("q4", "q5"))).isEqualTo(true)
}
@Test
fun `areFactoriesEmpty - all non-empty`() {
val subject = FastJobStorage(mockDatabase(DataSet1.FULL_SPECS))
subject.init()
assertThat(subject.areFactoriesEmpty(setOf("f1"))).isEqualTo(false)
assertThat(subject.areFactoriesEmpty(setOf("f1", "f2"))).isEqualTo(false)
subject.deleteJobs(listOf("id1"))
assertThat(subject.areFactoriesEmpty(setOf("f1"))).isEqualTo(true)
assertThat(subject.areFactoriesEmpty(setOf("f1", "f2"))).isEqualTo(false)
subject.deleteJobs(listOf("id2"))
assertThat(subject.areFactoriesEmpty(setOf("f1"))).isEqualTo(true)
assertThat(subject.areFactoriesEmpty(setOf("f1", "f2"))).isEqualTo(true)
}
@Test
fun `areFactoriesEmpty - mixed empty`() {
val subject = FastJobStorage(mockDatabase(DataSet1.FULL_SPECS))
subject.init()
assertThat(subject.areFactoriesEmpty(setOf("f1", "f5"))).isEqualTo(false)
subject.deleteJobs(listOf("id1"))
assertThat(subject.areFactoriesEmpty(setOf("f1", "f5"))).isEqualTo(true)
}
@Test
fun `areFactoriesEmpty - queue does not exist`() {
val subject = FastJobStorage(mockDatabase(DataSet1.FULL_SPECS))
subject.init()
assertThat(subject.areFactoriesEmpty(setOf("f4"))).isEqualTo(true)
assertThat(subject.areFactoriesEmpty(setOf("f4", "f5"))).isEqualTo(true)
subject.insertJobs(listOf(fullSpec("id4", "f4")))
assertThat(subject.areFactoriesEmpty(setOf("f4"))).isEqualTo(false)
assertThat(subject.areFactoriesEmpty(setOf("f4", "f5"))).isEqualTo(false)
}
@Test
fun `areFactoriesEmpty - empty set parameter`() {
val subject = FastJobStorage(mockDatabase(DataSet1.FULL_SPECS))
subject.init()
assertThat(subject.areFactoriesEmpty(emptySet())).isEqualTo(true)
}
@Test
fun `areFactoriesEmpty - factory key change via updateJobs maintains correct counts`() {
val fullSpec1 = fullSpec(id = "id1", factoryKey = "f1")
val fullSpec2 = fullSpec(id = "id2", factoryKey = "f1")
val subject = FastJobStorage(mockDatabase(listOf(fullSpec1, fullSpec2)))
subject.init()
// Initially, f1 has 2 jobs, f2 has 0
assertThat(subject.areFactoriesEmpty(setOf("f1"))).isEqualTo(false)
assertThat(subject.areFactoriesEmpty(setOf("f2"))).isEqualTo(true)
// Update one job to change factory key from f1 to f2
val updatedJob = jobSpec(id = "id1", factoryKey = "f2")
subject.updateJobs(listOf(updatedJob))
// Now f1 has 1 job, f2 has 1 job
assertThat(subject.areFactoriesEmpty(setOf("f1"))).isEqualTo(false)
assertThat(subject.areFactoriesEmpty(setOf("f2"))).isEqualTo(false)
// Update the other job from f1 to f3
val updatedJob2 = jobSpec(id = "id2", factoryKey = "f3")
subject.updateJobs(listOf(updatedJob2))
// Now f1 has 0 jobs, f2 has 1 job, f3 has 1 job
assertThat(subject.areFactoriesEmpty(setOf("f1"))).isEqualTo(true)
assertThat(subject.areFactoriesEmpty(setOf("f2"))).isEqualTo(false)
assertThat(subject.areFactoriesEmpty(setOf("f3"))).isEqualTo(false)
}
@Test
fun `areFactoriesEmpty - factory key change via transformJobs maintains correct counts`() {
val fullSpec1 = fullSpec(id = "id1", factoryKey = "f1")
val fullSpec2 = fullSpec(id = "id2", factoryKey = "f1")
val fullSpec3 = fullSpec(id = "id3", factoryKey = "f2")
val subject = FastJobStorage(mockDatabase(listOf(fullSpec1, fullSpec2, fullSpec3)))
subject.init()
// Initially, f1 has 2 jobs, f2 has 1 job, f3 has 0 jobs
assertThat(subject.areFactoriesEmpty(setOf("f1"))).isEqualTo(false)
assertThat(subject.areFactoriesEmpty(setOf("f2"))).isEqualTo(false)
assertThat(subject.areFactoriesEmpty(setOf("f3"))).isEqualTo(true)
// Transform jobs: change job "id1" from f1 to f3, and job "id3" from f2 to f1
subject.transformJobs { job ->
when (job.id) {
"id1" -> job.copy(factoryKey = "f3")
"id3" -> job.copy(factoryKey = "f1")
else -> job
}
}
// Now f1 has 2 jobs (job "id2" + transformed job "id3"), f2 has 0 jobs, f3 has 1 job (transformed job "id1")
assertThat(subject.areFactoriesEmpty(setOf("f1"))).isEqualTo(false)
assertThat(subject.areFactoriesEmpty(setOf("f2"))).isEqualTo(true)
assertThat(subject.areFactoriesEmpty(setOf("f3"))).isEqualTo(false)
}
@Test
fun `areFactoriesEmpty - memory-only jobs affect factory counts correctly`() {
val memoryJob = fullSpec(id = "id1", factoryKey = "f1")
val durableJob = fullSpec(id = "id2", factoryKey = "f1")
val subject = FastJobStorage(mockDatabase(listOf(memoryJob, durableJob)))
subject.init()
// Both memory-only and durable jobs should be counted for factory
assertThat(subject.areFactoriesEmpty(setOf("f1"))).isEqualTo(false)
// Delete the durable job - f1 should still have the memory-only job
subject.deleteJobs(listOf("id2"))
assertThat(subject.areFactoriesEmpty(setOf("f1"))).isEqualTo(false)
// Delete the memory-only job - now f1 should be empty
subject.deleteJobs(listOf("id1"))
assertThat(subject.areFactoriesEmpty(setOf("f1"))).isEqualTo(true)
// Insert a new memory-only job
val newMemoryJob = fullSpec(id = "id3", factoryKey = "f2")
subject.insertJobs(listOf(newMemoryJob))
assertThat(subject.areFactoriesEmpty(setOf("f2"))).isEqualTo(false)
}
@Test
fun `areFactoriesEmpty - migration queue jobs counted in factory index`() {
val normalJob = fullSpec(id = "id1", factoryKey = "f1", queueKey = "normalQueue")
val migrationJob = fullSpec(id = "id2", factoryKey = "f1", queueKey = Job.Parameters.MIGRATION_QUEUE_KEY)
val otherMigrationJob = fullSpec(id = "id3", factoryKey = "f2", queueKey = Job.Parameters.MIGRATION_QUEUE_KEY)
val subject = FastJobStorage(mockDatabase(listOf(normalJob, migrationJob, otherMigrationJob)))
subject.init()
// Both normal and migration jobs should be counted for their factories
assertThat(subject.areFactoriesEmpty(setOf("f1"))).isEqualTo(false) // has normal + migration job
assertThat(subject.areFactoriesEmpty(setOf("f2"))).isEqualTo(false) // has migration job
assertThat(subject.areFactoriesEmpty(setOf("f3"))).isEqualTo(true) // empty
// Delete the normal job - f1 should still have the migration job
subject.deleteJobs(listOf("id1"))
assertThat(subject.areFactoriesEmpty(setOf("f1"))).isEqualTo(false)
// Delete the migration job - now f1 should be empty
subject.deleteJobs(listOf("id2"))
assertThat(subject.areFactoriesEmpty(setOf("f1"))).isEqualTo(true)
assertThat(subject.areFactoriesEmpty(setOf("f2"))).isEqualTo(false) // still has migration job
// Delete the other migration job - now f2 should be empty
subject.deleteJobs(listOf("id3"))
assertThat(subject.areFactoriesEmpty(setOf("f2"))).isEqualTo(true)
}
@Test
fun `areFactoriesEmpty - unrelated operations dont affect factory counts`() {
val subject = FastJobStorage(mockDatabase(listOf(DataSet1.FULL_SPEC_1)))
subject.init()
// Initial state
assertThat(subject.areFactoriesEmpty(setOf("f1"))).isEqualTo(false)
// Operations that should NOT affect factory counts
subject.markJobAsRunning("id1", 100)
assertThat(subject.areFactoriesEmpty(setOf("f1"))).isEqualTo(false)
subject.updateJobAfterRetry("id1", 200, 1, 1000, "data".toByteArray())
assertThat(subject.areFactoriesEmpty(setOf("f1"))).isEqualTo(false)
subject.updateAllJobsToBePending()
assertThat(subject.areFactoriesEmpty(setOf("f1"))).isEqualTo(false)
// Verify the job is still there and counts are still accurate
assertThat(subject.getJobSpec("id1")).isNotNull()
assertThat(subject.getJobCountForFactory("f1")).isEqualTo(1)
}
@Test
fun `areFactoriesEmpty - multiple jobs same factory complex lifecycle`() {
val job1 = fullSpec(id = "id1", factoryKey = "f1")
val job2 = fullSpec(id = "id2", factoryKey = "f1")
val job3 = fullSpec(id = "id3", factoryKey = "f1")
val subject = FastJobStorage(mockDatabase(listOf(job1, job2, job3)))
subject.init()
// Initially f1 has 3 jobs
assertThat(subject.areFactoriesEmpty(setOf("f1"))).isEqualTo(false)
assertThat(subject.getJobCountForFactory("f1")).isEqualTo(3)
// Delete 2 jobs, f1 should still have 1
subject.deleteJobs(listOf("id1", "id2"))
assertThat(subject.areFactoriesEmpty(setOf("f1"))).isEqualTo(false)
assertThat(subject.getJobCountForFactory("f1")).isEqualTo(1)
// Add more jobs to the same factory
val job4 = fullSpec(id = "id4", factoryKey = "f1")
val job5 = fullSpec(id = "id5", factoryKey = "f1")
subject.insertJobs(listOf(job4, job5))
// Now f1 should have 3 jobs (remaining job3 + new job4 + job5)
assertThat(subject.areFactoriesEmpty(setOf("f1"))).isEqualTo(false)
assertThat(subject.getJobCountForFactory("f1")).isEqualTo(3)
// Transform one job to different factory
subject.transformJobs { job ->
if (job.id == "id3") job.copy(factoryKey = "f2") else job
}
// Now f1 should have 2 jobs, f2 should have 1 job
assertThat(subject.areFactoriesEmpty(setOf("f1"))).isEqualTo(false)
assertThat(subject.areFactoriesEmpty(setOf("f2"))).isEqualTo(false)
assertThat(subject.getJobCountForFactory("f1")).isEqualTo(2)
assertThat(subject.getJobCountForFactory("f2")).isEqualTo(1)
// Delete all remaining jobs from f1
subject.deleteJobs(listOf("id4", "id5"))
assertThat(subject.areFactoriesEmpty(setOf("f1"))).isEqualTo(true)
assertThat(subject.areFactoriesEmpty(setOf("f2"))).isEqualTo(false)
assertThat(subject.getJobCountForFactory("f1")).isEqualTo(0)
}
@Test
fun `areFactoriesEmpty - factory count consistency after complex operations`() {
val job1 = fullSpec(id = "id1", factoryKey = "f1")
val job2 = fullSpec(id = "id2", factoryKey = "f2")
val subject = FastJobStorage(mockDatabase(listOf(job1, job2)))
subject.init()
// Complex operation chain: insert -> transform -> update -> delete
// 1. Insert new jobs
val newJobs = listOf(
fullSpec(id = "id3", factoryKey = "f1"),
fullSpec(id = "id4", factoryKey = "f3")
)
subject.insertJobs(newJobs)
// State: f1=2, f2=1, f3=1
assertThat(subject.areFactoriesEmpty(setOf("f1"))).isEqualTo(false)
assertThat(subject.areFactoriesEmpty(setOf("f2"))).isEqualTo(false)
assertThat(subject.areFactoriesEmpty(setOf("f3"))).isEqualTo(false)
// 2. Transform jobs: change factory keys around
subject.transformJobs { job ->
when (job.id) {
"id1" -> job.copy(factoryKey = "f2") // f1 -> f2
"id2" -> job.copy(factoryKey = "f3") // f2 -> f3
"id3" -> job.copy(factoryKey = "f4") // f1 -> f4
else -> job
}
}
// State: f1=0, f2=1, f3=2, f4=1
assertThat(subject.areFactoriesEmpty(setOf("f1"))).isEqualTo(true)
assertThat(subject.areFactoriesEmpty(setOf("f2"))).isEqualTo(false)
assertThat(subject.areFactoriesEmpty(setOf("f3"))).isEqualTo(false)
assertThat(subject.areFactoriesEmpty(setOf("f4"))).isEqualTo(false)
// 3. Update jobs: change factory key again
val updatedJob = jobSpec(id = "id4", factoryKey = "f5") // f3 -> f5
subject.updateJobs(listOf(updatedJob))
// State: f1=0, f2=1, f3=1, f4=1, f5=1
assertThat(subject.areFactoriesEmpty(setOf("f3"))).isEqualTo(false)
assertThat(subject.areFactoriesEmpty(setOf("f5"))).isEqualTo(false)
// 4. Delete jobs and verify final counts
subject.deleteJobs(listOf("id1", "id2"))
// State: f1=0, f2=0, f3=0, f4=1, f5=1
assertThat(subject.areFactoriesEmpty(setOf("f1", "f2", "f3"))).isEqualTo(true)
assertThat(subject.areFactoriesEmpty(setOf("f4", "f5"))).isEqualTo(false)
assertThat(subject.areFactoriesEmpty(setOf("f1", "f2", "f3", "f4", "f5"))).isEqualTo(false)
// Verify actual counts match areFactoriesEmpty results
assertThat(subject.getJobCountForFactory("f1")).isEqualTo(0)
assertThat(subject.getJobCountForFactory("f2")).isEqualTo(0)
assertThat(subject.getJobCountForFactory("f3")).isEqualTo(0)
assertThat(subject.getJobCountForFactory("f4")).isEqualTo(1)
assertThat(subject.getJobCountForFactory("f5")).isEqualTo(1)
}
@Test
fun `areFactoriesEmpty - atomic counter behavior edge cases`() {
val job1 = fullSpec(id = "id1", factoryKey = "f1")
val subject = FastJobStorage(mockDatabase(listOf(job1)))
subject.init()
// Initial state - f1 has 1 job
assertThat(subject.areFactoriesEmpty(setOf("f1"))).isEqualTo(false)
// Try to delete the same job multiple times - should be idempotent
subject.deleteJobs(listOf("id1"))
assertThat(subject.areFactoriesEmpty(setOf("f1"))).isEqualTo(true)
// Delete again - should not affect the count (already 0)
subject.deleteJobs(listOf("id1"))
assertThat(subject.areFactoriesEmpty(setOf("f1"))).isEqualTo(true)
// Delete non-existent job - should not affect counts
subject.deleteJobs(listOf("nonexistent"))
assertThat(subject.areFactoriesEmpty(setOf("f1"))).isEqualTo(true)
// Add job back and test multiple deletes in single call
val newJob = fullSpec(id = "id2", factoryKey = "f1")
subject.insertJobs(listOf(newJob))
assertThat(subject.areFactoriesEmpty(setOf("f1"))).isEqualTo(false)
// Delete with duplicate IDs in the same call - should be handled gracefully
subject.deleteJobs(listOf("id2", "id2", "nonexistent"))
assertThat(subject.areFactoriesEmpty(setOf("f1"))).isEqualTo(true)
// Verify factory counting works correctly after edge case operations
val finalJobs = listOf(
fullSpec(id = "id3", factoryKey = "f1"),
fullSpec(id = "id4", factoryKey = "f1")
)
subject.insertJobs(finalJobs)
assertThat(subject.areFactoriesEmpty(setOf("f1"))).isEqualTo(false)
assertThat(subject.getJobCountForFactory("f1")).isEqualTo(2)
}
private fun mockDatabase(fullSpecs: List<FullSpec> = emptyList()): JobDatabase {
val jobs = fullSpecs.map { it.jobSpec }.toMutableList()
val constraints = fullSpecs.map { it.constraintSpecs }.flatten().toMutableList()
@@ -1066,6 +1398,10 @@ class FastJobStorageTest {
return mock
}
private fun fullSpec(id: String, factoryKey: String, queueKey: String? = null): FullSpec {
return FullSpec(jobSpec(id, factoryKey, queueKey), emptyList(), emptyList())
}
private fun jobSpec(
id: String,
factoryKey: String,