Backfill missing attachment hashes.

This commit is contained in:
Greyson Parrelli
2024-03-15 13:29:47 -04:00
committed by Cody Henthorne
parent 6df1a68213
commit 1d29b0166d
7 changed files with 451 additions and 2 deletions

View File

@@ -94,6 +94,7 @@ import java.security.MessageDigest
import java.security.NoSuchAlgorithmException
import java.util.LinkedList
import java.util.Optional
import java.util.UUID
import kotlin.time.Duration.Companion.days
class AttachmentTable(
@@ -255,6 +256,80 @@ class AttachmentTable(
} ?: throw IOException("No stream for: $attachmentId")
}
/**
* Returns a [File] for an attachment that has no [DATA_HASH_END] and is in the [TRANSFER_PROGRESS_DONE] state, if present.
*/
fun getUnhashedDataFile(): Pair<File, AttachmentId>? {
return readableDatabase
.select(ID, DATA_FILE)
.from(TABLE_NAME)
.where("$DATA_FILE NOT NULL AND $DATA_HASH_END IS NULL AND $TRANSFER_STATE = $TRANSFER_PROGRESS_DONE")
.orderBy("$ID DESC")
.limit(1)
.run()
.readToSingleObject {
File(it.requireNonNullString(DATA_FILE)) to AttachmentId(it.requireLong(ID))
}
}
/**
* Sets the [DATA_HASH_END] for a given file. This is used to backfill the hash for attachments that were created before we started hashing them.
* As a result, this will _not_ update the hashes on files that are not fully uploaded.
*/
fun setHashForDataFile(file: File, hash: ByteArray) {
writableDatabase.withinTransaction { db ->
val hashEnd = Base64.encodeWithPadding(hash)
val (existingFile: String?, existingSize: Long?, existingRandom: ByteArray?) = db.select(DATA_FILE, DATA_SIZE, DATA_RANDOM)
.from(TABLE_NAME)
.where("$DATA_HASH_END = ? AND $TRANSFER_STATE = $TRANSFER_PROGRESS_DONE AND $DATA_FILE NOT NULL AND $DATA_FILE != ?", hashEnd, file.absolutePath)
.limit(1)
.run()
.readToSingleObject {
Triple(
it.requireString(DATA_FILE),
it.requireLong(DATA_SIZE),
it.requireBlob(DATA_RANDOM)
)
} ?: Triple(null, null, null)
if (existingFile != null) {
Log.i(TAG, "[setHashForDataFile] Found that a different file has the same HASH_END. Using that one instead. Pre-existing file: $existingFile", true)
val updateCount = writableDatabase
.update(TABLE_NAME)
.values(
DATA_FILE to existingFile,
DATA_HASH_END to hashEnd,
DATA_SIZE to existingSize,
DATA_RANDOM to existingRandom
)
.where("$DATA_FILE = ? AND $DATA_HASH_END IS NULL AND $TRANSFER_STATE = $TRANSFER_PROGRESS_DONE", file.absolutePath)
.run()
Log.i(TAG, "[setHashForDataFile] Deduped $updateCount attachments.", true)
val oldFileInUse = db.exists(TABLE_NAME).where("$DATA_FILE = ?", file.absolutePath).run()
if (oldFileInUse) {
Log.i(TAG, "[setHashForDataFile] Old file is still in use by some in-progress attachment.", true)
} else {
Log.i(TAG, "[setHashForDataFile] Deleting unused file: $file")
if (!file.delete()) {
Log.w(TAG, "Failed to delete duped file!")
}
}
} else {
val updateCount = writableDatabase
.update(TABLE_NAME)
.values(DATA_HASH_END to Base64.encodeWithPadding(hash))
.where("$DATA_FILE = ? AND $TRANSFER_STATE = $TRANSFER_PROGRESS_DONE", file.absolutePath)
.run()
Log.i(TAG, "[setHashForDataFile] Updated the HASH_END for $updateCount rows using file ${file.absolutePath}")
}
}
}
fun getAttachment(attachmentId: AttachmentId): DatabaseAttachment? {
return readableDatabase
.select(*PROJECTION)
@@ -482,6 +557,34 @@ class AttachmentTable(
return onDiskButNotInDatabase.size
}
/**
* Removes all references to the provided [DATA_FILE] from all attachments.
* Only do this if the file is known to not exist or has some other critical problem!
*/
fun clearUsagesOfDataFile(file: File) {
val updateCount = writableDatabase
.update(TABLE_NAME)
.values(DATA_FILE to null)
.where("$DATA_FILE = ?", file.absolutePath)
.run()
Log.i(TAG, "[clearUsagesOfFile] Cleared $updateCount usages of $file", true)
}
/**
* Indicates that, for whatever reason, a hash could not be calculated for the file in question.
* We put in a "bad hash" that will never match anything else so that we don't attempt to backfill it in the future.
*/
fun markDataFileAsUnhashable(file: File) {
val updateCount = writableDatabase
.update(TABLE_NAME)
.values(DATA_HASH_END to "UNHASHABLE-${UUID.randomUUID()}")
.where("$DATA_FILE = ? AND $DATA_HASH_END IS NULL AND $TRANSFER_STATE = $TRANSFER_PROGRESS_DONE", file.absolutePath)
.run()
Log.i(TAG, "[markDataFileAsUnhashable] Marked $updateCount attachments as unhashable with file: ${file.absolutePath}", true)
}
fun deleteAllAttachments() {
Log.d(TAG, "[deleteAllAttachments]")
@@ -1610,6 +1713,11 @@ class AttachmentTable(
return existing.copy(sentMediaQuality = sentMediaQuality.code)
}
@JvmStatic
fun forSentMediaQuality(sentMediaQuality: Int): TransformProperties {
return TransformProperties(sentMediaQuality = sentMediaQuality)
}
@JvmStatic
fun parse(serialized: String?): TransformProperties {
return if (serialized == null) {

View File

@@ -0,0 +1,14 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.jobmanager
import org.thoughtcrime.securesms.jobmanager.impl.BackoffUtil
import org.thoughtcrime.securesms.util.FeatureFlags
/**
* Helper to calculate the default backoff interval for a [Job] given it's run attempt count.
*/
fun Job.defaultBackoffInterval(): Long = BackoffUtil.exponentialBackoff(runAttempt + 1, FeatureFlags.getDefaultMaxBackoff())

View File

@@ -0,0 +1,112 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.jobs
import org.signal.core.util.ThreadUtil
import org.signal.core.util.drain
import org.signal.core.util.logging.Log
import org.thoughtcrime.securesms.attachments.AttachmentId
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobmanager.defaultBackoffInterval
import java.io.File
import java.io.FileNotFoundException
import java.io.IOException
import java.security.DigestInputStream
import java.security.MessageDigest
/**
* This job backfills hashes for attachments that were sent before we started hashing them.
* In order to avoid hammering the device with hash calculations and disk I/O, this job will
* calculate the hash for a single attachment and then reschedule itself to run again if necessary.
*/
class AttachmentHashBackfillJob private constructor(parameters: Parameters) : Job(parameters) {
companion object {
val TAG = Log.tag(AttachmentHashBackfillJob::class.java)
const val KEY = "AttachmentHashBackfillJob"
}
private var activeFile: File? = null
constructor() : this(
Parameters.Builder()
.setQueue(KEY)
.setMaxInstancesForFactory(2)
.setLifespan(Parameters.IMMORTAL)
.setMaxAttempts(10)
.build()
)
override fun serialize() = null
override fun getFactoryKey() = KEY
override fun run(): Result {
val (file: File?, attachmentId: AttachmentId?) = SignalDatabase.attachments.getUnhashedDataFile() ?: (null to null)
if (file == null || attachmentId == null) {
Log.i(TAG, "No more unhashed files! Task complete.")
return Result.success()
}
activeFile = file
if (!file.exists()) {
Log.w(TAG, "File does not exist! Clearing all usages.", true)
SignalDatabase.attachments.clearUsagesOfDataFile(file)
ApplicationDependencies.getJobManager().add(AttachmentHashBackfillJob())
return Result.success()
}
try {
val inputStream = SignalDatabase.attachments.getAttachmentStream(attachmentId, 0)
val messageDigest = MessageDigest.getInstance("SHA-256")
DigestInputStream(inputStream, messageDigest).use {
it.drain()
}
val hash = messageDigest.digest()
SignalDatabase.attachments.setHashForDataFile(file, hash)
} catch (e: FileNotFoundException) {
Log.w(TAG, "File could not be found! Clearing all usages.", true)
SignalDatabase.attachments.clearUsagesOfDataFile(file)
} catch (e: IOException) {
Log.e(TAG, "Error hashing attachment. Retrying.", e)
if (e.cause is FileNotFoundException) {
Log.w(TAG, "Underlying cause was a FileNotFoundException. Clearing all usages.", true)
SignalDatabase.attachments.clearUsagesOfDataFile(file)
} else {
return Result.retry(defaultBackoffInterval())
}
}
// Sleep just so we don't hammer the device with hash calculations and disk I/O
ThreadUtil.sleep(1000)
ApplicationDependencies.getJobManager().add(AttachmentHashBackfillJob())
return Result.success()
}
override fun onFailure() {
activeFile?.let { file ->
Log.w(TAG, "Failed to calculate hash, marking as unhashable: $file", true)
SignalDatabase.attachments.markDataFileAsUnhashable(file)
} ?: Log.w(TAG, "Job failed, but no active file is set!")
ApplicationDependencies.getJobManager().add(AttachmentHashBackfillJob())
}
class Factory : Job.Factory<AttachmentHashBackfillJob> {
override fun create(parameters: Parameters, serializedData: ByteArray?): AttachmentHashBackfillJob {
return AttachmentHashBackfillJob(parameters)
}
}
}

View File

@@ -40,6 +40,7 @@ import org.thoughtcrime.securesms.migrations.AccountConsistencyMigrationJob;
import org.thoughtcrime.securesms.migrations.AccountRecordMigrationJob;
import org.thoughtcrime.securesms.migrations.ApplyUnknownFieldsToSelfMigrationJob;
import org.thoughtcrime.securesms.migrations.AttachmentCleanupMigrationJob;
import org.thoughtcrime.securesms.migrations.AttachmentHashBackfillMigrationJob;
import org.thoughtcrime.securesms.migrations.AttributesMigrationJob;
import org.thoughtcrime.securesms.migrations.AvatarIdRemovalMigrationJob;
import org.thoughtcrime.securesms.migrations.AvatarMigrationJob;
@@ -101,6 +102,7 @@ public final class JobManagerFactories {
put(AttachmentCompressionJob.KEY, new AttachmentCompressionJob.Factory());
put(AttachmentCopyJob.KEY, new AttachmentCopyJob.Factory());
put(AttachmentDownloadJob.KEY, new AttachmentDownloadJob.Factory());
put(AttachmentHashBackfillJob.KEY, new AttachmentHashBackfillJob.Factory());
put(AttachmentMarkUploadedJob.KEY, new AttachmentMarkUploadedJob.Factory());
put(AttachmentUploadJob.KEY, new AttachmentUploadJob.Factory());
put(AutomaticSessionResetJob.KEY, new AutomaticSessionResetJob.Factory());
@@ -225,6 +227,7 @@ public final class JobManagerFactories {
put(AccountRecordMigrationJob.KEY, new AccountRecordMigrationJob.Factory());
put(ApplyUnknownFieldsToSelfMigrationJob.KEY, new ApplyUnknownFieldsToSelfMigrationJob.Factory());
put(AttachmentCleanupMigrationJob.KEY, new AttachmentCleanupMigrationJob.Factory());
put(AttachmentHashBackfillMigrationJob.KEY, new AttachmentHashBackfillMigrationJob.Factory());
put(AttributesMigrationJob.KEY, new AttributesMigrationJob.Factory());
put(AvatarIdRemovalMigrationJob.KEY, new AvatarIdRemovalMigrationJob.Factory());
put(AvatarMigrationJob.KEY, new AvatarMigrationJob.Factory());

View File

@@ -145,11 +145,12 @@ public class ApplicationMigrations {
static final int STORAGE_LOCAL_UNKNOWNS_FIX = 101;
static final int PNP_LAUNCH = 102;
static final int EMOJI_VERSION_10 = 103;
static final int ATTACHMENT_HASH_BACKFILL = 104;
}
public static final int CURRENT_VERSION = 103;
public static final int CURRENT_VERSION = 104;
/**
/**
* This *must* be called after the {@link JobManager} has been instantiated, but *before* the call
* to {@link JobManager#beginJobLoop()}. Otherwise, other non-migration jobs may have started
* executing before we add the migration jobs.
@@ -662,6 +663,10 @@ public class ApplicationMigrations {
jobs.put(Version.EMOJI_VERSION_10, new EmojiDownloadMigrationJob());
}
if (lastSeenVersion < Version.ATTACHMENT_HASH_BACKFILL) {
jobs.put(Version.ATTACHMENT_HASH_BACKFILL, new AttachmentHashBackfillMigrationJob());
}
return jobs;
}

View File

@@ -0,0 +1,39 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.migrations
import org.signal.core.util.logging.Log
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobs.AttachmentHashBackfillJob
import java.lang.Exception
/**
* Kicks off the attachment hash backfill process by enqueueing a [AttachmentHashBackfillJob].
*/
internal class AttachmentHashBackfillMigrationJob(parameters: Parameters = Parameters.Builder().build()) : MigrationJob(parameters) {
companion object {
val TAG = Log.tag(AttachmentHashBackfillMigrationJob::class.java)
const val KEY = "AttachmentHashBackfillMigrationJob"
}
override fun getFactoryKey(): String = KEY
override fun isUiBlocking(): Boolean = false
override fun performMigration() {
ApplicationDependencies.getJobManager().add(AttachmentHashBackfillJob())
}
override fun shouldRetry(e: Exception): Boolean = false
class Factory : Job.Factory<AttachmentHashBackfillMigrationJob> {
override fun create(parameters: Parameters, serializedData: ByteArray?): AttachmentHashBackfillMigrationJob {
return AttachmentHashBackfillMigrationJob(parameters)
}
}
}