Add migration to backfill digests.

This commit is contained in:
Greyson Parrelli
2024-09-04 11:25:15 -04:00
committed by Cody Henthorne
parent a8bf03af89
commit d59985c7b1
8 changed files with 255 additions and 8 deletions

View File

@@ -2,13 +2,13 @@ package org.thoughtcrime.securesms.backup
import org.greenrobot.eventbus.EventBus import org.greenrobot.eventbus.EventBus
import org.signal.core.util.logging.Log import org.signal.core.util.logging.Log
import org.signal.core.util.stream.NullOutputStream
import org.thoughtcrime.securesms.backup.proto.Attachment import org.thoughtcrime.securesms.backup.proto.Attachment
import org.thoughtcrime.securesms.backup.proto.Avatar import org.thoughtcrime.securesms.backup.proto.Avatar
import org.thoughtcrime.securesms.backup.proto.BackupFrame import org.thoughtcrime.securesms.backup.proto.BackupFrame
import org.thoughtcrime.securesms.backup.proto.Sticker import org.thoughtcrime.securesms.backup.proto.Sticker
import java.io.IOException import java.io.IOException
import java.io.InputStream import java.io.InputStream
import java.io.OutputStream
/** /**
* Given a backup file, run over it and verify it will decrypt properly when attempting to import it. * Given a backup file, run over it and verify it will decrypt properly when attempting to import it.
@@ -89,10 +89,4 @@ object BackupVerifier {
} }
return true return true
} }
private object NullOutputStream : OutputStream() {
override fun write(b: Int) = Unit
override fun write(b: ByteArray?) = Unit
override fun write(b: ByteArray?, off: Int, len: Int) = Unit
}
} }

View File

@@ -1224,6 +1224,58 @@ class AttachmentTable(
} }
} }
/**
* A query for a specific migration. Retrieves attachments that we'd need to create a new digest for.
* These are attachments that have finished downloading and have data to create a digest from.
*/
fun getAttachmentsThatNeedNewDigests(): List<AttachmentId> {
return readableDatabase
.select(ID)
.from(TABLE_NAME)
.where(
"""
(
$REMOTE_KEY IS NULL OR
$REMOTE_IV IS NULL OR
$REMOTE_DIGEST IS NULL
)
AND
(
$TRANSFER_STATE = $TRANSFER_PROGRESS_DONE AND
$DATA_FILE NOT NULL
)
"""
)
.run()
.readToList { AttachmentId(it.requireLong(ID)) }
}
/**
* There was a temporary bug where we were saving the wrong size for attachments. This function can be used to correct that.
*/
fun updateSize(attachmentId: AttachmentId, size: Long) {
writableDatabase
.update(TABLE_NAME)
.values(DATA_SIZE to size)
.where("$ID = ?", attachmentId.id)
.run()
}
/**
* As part of the digest backfill process, this updates the (key, IV, digest) tuple for an attachment.
*/
fun updateKeyIvDigest(attachmentId: AttachmentId, key: ByteArray, iv: ByteArray, digest: ByteArray) {
writableDatabase
.update(TABLE_NAME)
.values(
REMOTE_KEY to Base64.encodeWithPadding(key),
REMOTE_IV to iv,
REMOTE_DIGEST to digest
)
.where("$ID = ?", attachmentId.id)
.run()
}
/** /**
* Inserts new attachments in the table. The [Attachment]s may or may not have data, depending on whether it's an attachment we created locally or some * Inserts new attachments in the table. The [Attachment]s may or may not have data, depending on whether it's an attachment we created locally or some
* inbound attachment that we haven't fetched yet. * inbound attachment that we haven't fetched yet.

View File

@@ -0,0 +1,133 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.jobs
import org.signal.core.util.Base64
import org.signal.core.util.StreamUtil
import org.signal.core.util.logging.Log
import org.signal.core.util.readLength
import org.signal.core.util.stream.NullOutputStream
import org.signal.core.util.withinTransaction
import org.thoughtcrime.securesms.attachments.AttachmentId
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobs.protos.BackfillDigestJobData
import org.thoughtcrime.securesms.util.Util
import org.whispersystems.signalservice.api.crypto.AttachmentCipherOutputStream
import org.whispersystems.signalservice.internal.crypto.PaddingInputStream
import java.io.IOException
/**
* For attachments that were created before we saved IV's, this will generate an IV and update the corresponding digest.
* This is important for backupsV2, where we need to know an attachments digest in advance.
*
* This job needs to be careful to (1) minimize time in the transaction, and (2) never write partial results to disk, i.e. only write the full (key/iv/digest)
* tuple together all at once (partial writes could poison the db, preventing us from retrying properly in the event of a crash or transient error).
*/
class BackfillDigestJob private constructor(
private val attachmentId: AttachmentId,
params: Parameters
) : Job(params) {
companion object {
private val TAG = Log.tag(BackfillDigestJob::class)
const val KEY = "BackfillDigestJob"
}
constructor(attachmentId: AttachmentId) : this(
attachmentId = attachmentId,
params = Parameters.Builder()
.setQueue("BackfillDigestJob")
.setMaxAttempts(3)
.setLifespan(Parameters.IMMORTAL)
.setPriority(Parameters.PRIORITY_LOW)
.build()
)
override fun serialize(): ByteArray {
return BackfillDigestJobData(attachmentId = attachmentId.id).encode()
}
override fun getFactoryKey(): String = KEY
override fun run(): Result {
val (originalKey, originalIv, decryptingStream) = SignalDatabase.rawDatabase.withinTransaction {
val attachment = SignalDatabase.attachments.getAttachment(attachmentId)
if (attachment == null) {
Log.w(TAG, "$attachmentId no longer exists! Skipping.")
return Result.success()
}
if (!attachment.hasData) {
Log.w(TAG, "$attachmentId no longer has any data! Skipping.")
return Result.success()
}
if (attachment.remoteKey != null && attachment.remoteIv != null && attachment.remoteDigest != null) {
Log.w(TAG, "$attachmentId already has all required components! Skipping.")
return Result.success()
}
// There was a bug where we were accidentally saving the padded size for the attachment as the actual size. This corrects that.
// However, we're in a transaction, and reading a file is expensive in general, so we only do this if the length is unset or set to the padded size.
// Given that the padding algorithm targets padding <= 5%, and most attachments are a couple hundred kb, this should greatly limit the false positive rate
// to something like 1 in 10,000ish.
val fileLength = if (attachment.size == 0L || attachment.size == PaddingInputStream.getPaddedSize(attachment.size)) {
try {
SignalDatabase.attachments.getAttachmentStream(attachmentId, offset = 0).use { it.readLength() }
} catch (e: IOException) {
Log.w(TAG, "Could not open a stream for $attachmentId while calculating the length. Assuming that the file no longer exists. Skipping.", e)
return Result.success()
}
} else {
attachment.size
}
if (fileLength != attachment.size) {
Log.w(TAG, "$attachmentId had a saved size of ${attachment.size} but the actual size is $fileLength. Will update.")
SignalDatabase.attachments.updateSize(attachmentId, fileLength)
}
val stream = try {
SignalDatabase.attachments.getAttachmentStream(attachmentId, offset = 0)
} catch (e: IOException) {
Log.w(TAG, "Could not open a stream for $attachmentId. Assuming that the file no longer exists. Skipping.", e)
return Result.success()
}
// In order to match the exact digest calculation, we need to use the same padding that we would use when uploading the attachment.
Triple(attachment.remoteKey?.let { Base64.decode(it) }, attachment.remoteIv, PaddingInputStream(stream, fileLength))
}
val key = originalKey ?: Util.getSecretBytes(64)
val iv = originalIv ?: Util.getSecretBytes(16)
val cipherOutputStream = AttachmentCipherOutputStream(key, iv, NullOutputStream)
StreamUtil.copy(decryptingStream, cipherOutputStream)
val digest = cipherOutputStream.transmittedDigest
SignalDatabase.attachments.updateKeyIvDigest(
attachmentId = attachmentId,
key = key,
iv = iv,
digest = digest
)
return Result.success()
}
override fun onFailure() {
Log.w(TAG, "Failed to backfill digest for $attachmentId!")
}
class Factory : Job.Factory<BackfillDigestJob> {
override fun create(parameters: Parameters, serializedData: ByteArray?): BackfillDigestJob {
val attachmentId = AttachmentId(BackfillDigestJobData.ADAPTER.decode(serializedData!!).attachmentId)
return BackfillDigestJob(attachmentId, parameters)
}
}
}

View File

@@ -46,6 +46,7 @@ import org.thoughtcrime.securesms.migrations.AttachmentHashBackfillMigrationJob;
import org.thoughtcrime.securesms.migrations.AttributesMigrationJob; import org.thoughtcrime.securesms.migrations.AttributesMigrationJob;
import org.thoughtcrime.securesms.migrations.AvatarIdRemovalMigrationJob; import org.thoughtcrime.securesms.migrations.AvatarIdRemovalMigrationJob;
import org.thoughtcrime.securesms.migrations.AvatarMigrationJob; import org.thoughtcrime.securesms.migrations.AvatarMigrationJob;
import org.thoughtcrime.securesms.migrations.BackfillDigestsMigrationJob;
import org.thoughtcrime.securesms.migrations.BackupJitterMigrationJob; import org.thoughtcrime.securesms.migrations.BackupJitterMigrationJob;
import org.thoughtcrime.securesms.migrations.BackupNotificationMigrationJob; import org.thoughtcrime.securesms.migrations.BackupNotificationMigrationJob;
import org.thoughtcrime.securesms.migrations.BlobStorageLocationMigrationJob; import org.thoughtcrime.securesms.migrations.BlobStorageLocationMigrationJob;
@@ -116,6 +117,7 @@ public final class JobManagerFactories {
put(AutomaticSessionResetJob.KEY, new AutomaticSessionResetJob.Factory()); put(AutomaticSessionResetJob.KEY, new AutomaticSessionResetJob.Factory());
put(AvatarGroupsV1DownloadJob.KEY, new AvatarGroupsV1DownloadJob.Factory()); put(AvatarGroupsV1DownloadJob.KEY, new AvatarGroupsV1DownloadJob.Factory());
put(AvatarGroupsV2DownloadJob.KEY, new AvatarGroupsV2DownloadJob.Factory()); put(AvatarGroupsV2DownloadJob.KEY, new AvatarGroupsV2DownloadJob.Factory());
put(BackfillDigestJob.KEY, new BackfillDigestJob.Factory());
put(BackupMessagesJob.KEY, new BackupMessagesJob.Factory()); put(BackupMessagesJob.KEY, new BackupMessagesJob.Factory());
put(BackupRestoreJob.KEY, new BackupRestoreJob.Factory()); put(BackupRestoreJob.KEY, new BackupRestoreJob.Factory());
put(BackupRestoreMediaJob.KEY, new BackupRestoreMediaJob.Factory()); put(BackupRestoreMediaJob.KEY, new BackupRestoreMediaJob.Factory());
@@ -258,6 +260,7 @@ public final class JobManagerFactories {
put(AttributesMigrationJob.KEY, new AttributesMigrationJob.Factory()); put(AttributesMigrationJob.KEY, new AttributesMigrationJob.Factory());
put(AvatarIdRemovalMigrationJob.KEY, new AvatarIdRemovalMigrationJob.Factory()); put(AvatarIdRemovalMigrationJob.KEY, new AvatarIdRemovalMigrationJob.Factory());
put(AvatarMigrationJob.KEY, new AvatarMigrationJob.Factory()); put(AvatarMigrationJob.KEY, new AvatarMigrationJob.Factory());
put(BackfillDigestsMigrationJob.KEY, new BackfillDigestsMigrationJob.Factory());
put(BackupJitterMigrationJob.KEY, new BackupJitterMigrationJob.Factory()); put(BackupJitterMigrationJob.KEY, new BackupJitterMigrationJob.Factory());
put(BackupNotificationMigrationJob.KEY, new BackupNotificationMigrationJob.Factory()); put(BackupNotificationMigrationJob.KEY, new BackupNotificationMigrationJob.Factory());
put(BlobStorageLocationMigrationJob.KEY, new BlobStorageLocationMigrationJob.Factory()); put(BlobStorageLocationMigrationJob.KEY, new BlobStorageLocationMigrationJob.Factory());

View File

@@ -154,9 +154,10 @@ public class ApplicationMigrations {
static final int EXPIRE_TIMER_CAPABILITY = 109; static final int EXPIRE_TIMER_CAPABILITY = 109;
static final int REBUILD_MESSAGE_FTS_INDEX_6 = 110; static final int REBUILD_MESSAGE_FTS_INDEX_6 = 110;
static final int EXPIRE_TIMER_CAPABILITY_2 = 111; static final int EXPIRE_TIMER_CAPABILITY_2 = 111;
static final int BACKFILL_DIGESTS = 112;
} }
public static final int CURRENT_VERSION = 111; public static final int CURRENT_VERSION = 112;
/** /**
* This *must* be called after the {@link JobManager} has been instantiated, but *before* the call * This *must* be called after the {@link JobManager} has been instantiated, but *before* the call
@@ -703,6 +704,10 @@ public class ApplicationMigrations {
jobs.put(Version.EXPIRE_TIMER_CAPABILITY_2, new AttributesMigrationJob()); jobs.put(Version.EXPIRE_TIMER_CAPABILITY_2, new AttributesMigrationJob());
} }
if (lastSeenVersion < Version.BACKFILL_DIGESTS) {
jobs.put(Version.BACKFILL_DIGESTS, new BackfillDigestsMigrationJob());
}
return jobs; return jobs;
} }

View File

@@ -0,0 +1,39 @@
package org.thoughtcrime.securesms.migrations
import org.signal.core.util.logging.Log
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.dependencies.AppDependencies
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobs.BackfillDigestJob
/**
* Finds all attachments that need new digests and schedules a [BackfillDigestJob] for each.
*/
internal class BackfillDigestsMigrationJob(
parameters: Parameters = Parameters.Builder().build()
) : MigrationJob(parameters) {
companion object {
val TAG = Log.tag(BackfillDigestsMigrationJob::class.java)
const val KEY = "BackfillDigestsMigrationJob"
}
override fun getFactoryKey(): String = KEY
override fun isUiBlocking(): Boolean = false
override fun performMigration() {
val jobs = SignalDatabase.attachments.getAttachmentsThatNeedNewDigests()
.map { BackfillDigestJob(it) }
AppDependencies.jobManager.addAll(jobs)
}
override fun shouldRetry(e: Exception): Boolean = false
class Factory : Job.Factory<BackfillDigestsMigrationJob> {
override fun create(parameters: Parameters, serializedData: ByteArray?): BackfillDigestsMigrationJob {
return BackfillDigestsMigrationJob(parameters)
}
}
}

View File

@@ -125,3 +125,7 @@ message RestoreLocalAttachmentJobData {
string fileUri = 3; string fileUri = 3;
uint64 fileSize = 4; uint64 fileSize = 4;
} }
message BackfillDigestJobData {
uint64 attachmentId = 1;
}

View File

@@ -0,0 +1,17 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.signal.core.util.stream
import java.io.OutputStream
/**
* An output stream that drops all data on the floor. Basically piping to /dev/null.
*/
object NullOutputStream : OutputStream() {
override fun write(b: Int) = Unit
override fun write(b: ByteArray?) = Unit
override fun write(b: ByteArray?, off: Int, len: Int) = Unit
}