Fix issues with archive uploads matching digest.

This commit is contained in:
Greyson Parrelli
2024-09-18 14:11:36 -04:00
parent 48bd57c56a
commit 1ac19e84c2
13 changed files with 192 additions and 68 deletions

View File

@@ -73,8 +73,8 @@ import org.whispersystems.signalservice.api.messages.SignalServiceAttachment.Pro
import org.whispersystems.signalservice.api.push.ServiceId.ACI import org.whispersystems.signalservice.api.push.ServiceId.ACI
import org.whispersystems.signalservice.api.push.ServiceId.PNI import org.whispersystems.signalservice.api.push.ServiceId.PNI
import org.whispersystems.signalservice.internal.crypto.PaddingInputStream import org.whispersystems.signalservice.internal.crypto.PaddingInputStream
import org.whispersystems.signalservice.internal.push.AttachmentUploadForm
import org.whispersystems.signalservice.internal.push.SubscriptionsConfiguration import org.whispersystems.signalservice.internal.push.SubscriptionsConfiguration
import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec
import java.io.ByteArrayOutputStream import java.io.ByteArrayOutputStream
import java.io.File import java.io.File
import java.io.IOException import java.io.IOException
@@ -606,27 +606,30 @@ object BackupRepository {
} }
/** /**
* Retrieves an upload spec that can be used to upload attachment media. * Retrieves an [AttachmentUploadForm] that can be used to upload an attachment to the transit cdn.
* To continue the upload, use [org.whispersystems.signalservice.api.attachment.AttachmentApi.getResumableUploadSpec].
*
* It's important to note that in order to get this to the archive cdn, you still need to use [copyAttachmentToArchive].
*/ */
fun getMediaUploadSpec(secretKey: ByteArray? = null): NetworkResult<ResumableUploadSpec> { fun getAttachmentUploadForm(): NetworkResult<AttachmentUploadForm> {
val backupKey = SignalStore.svr.getOrCreateMasterKey().deriveBackupKey() val backupKey = SignalStore.svr.getOrCreateMasterKey().deriveBackupKey()
return initBackupAndFetchAuth(backupKey) return initBackupAndFetchAuth(backupKey)
.then { credential -> .then { credential ->
SignalNetwork.archive.getMediaUploadForm(backupKey, SignalStore.account.requireAci(), credential) SignalNetwork.archive.getMediaUploadForm(backupKey, SignalStore.account.requireAci(), credential)
} }
.then { form ->
SignalNetwork.archive.getResumableUploadSpec(form, secretKey)
}
} }
fun archiveThumbnail(thumbnailAttachment: Attachment, parentAttachment: DatabaseAttachment): NetworkResult<ArchiveMediaResponse> { /**
* Copies a thumbnail that has been uploaded to the transit cdn to the archive cdn.
*/
fun copyThumbnailToArchive(thumbnailAttachment: Attachment, parentAttachment: DatabaseAttachment): NetworkResult<ArchiveMediaResponse> {
val backupKey = SignalStore.svr.getOrCreateMasterKey().deriveBackupKey() val backupKey = SignalStore.svr.getOrCreateMasterKey().deriveBackupKey()
val request = thumbnailAttachment.toArchiveMediaRequest(parentAttachment.getThumbnailMediaName(), backupKey) val request = thumbnailAttachment.toArchiveMediaRequest(parentAttachment.getThumbnailMediaName(), backupKey)
return initBackupAndFetchAuth(backupKey) return initBackupAndFetchAuth(backupKey)
.then { credential -> .then { credential ->
SignalNetwork.archive.archiveAttachmentMedia( SignalNetwork.archive.copyAttachmentToArchive(
backupKey = backupKey, backupKey = backupKey,
aci = SignalStore.account.requireAci(), aci = SignalStore.account.requireAci(),
serviceCredential = credential, serviceCredential = credential,
@@ -635,7 +638,10 @@ object BackupRepository {
} }
} }
fun archiveMedia(attachment: DatabaseAttachment): NetworkResult<Unit> { /**
* Copies an attachment that has been uploaded to the transit cdn to the archive cdn.
*/
fun copyAttachmentToArchive(attachment: DatabaseAttachment): NetworkResult<Unit> {
val backupKey = SignalStore.svr.getOrCreateMasterKey().deriveBackupKey() val backupKey = SignalStore.svr.getOrCreateMasterKey().deriveBackupKey()
return initBackupAndFetchAuth(backupKey) return initBackupAndFetchAuth(backupKey)
@@ -643,7 +649,7 @@ object BackupRepository {
val mediaName = attachment.getMediaName() val mediaName = attachment.getMediaName()
val request = attachment.toArchiveMediaRequest(mediaName, backupKey) val request = attachment.toArchiveMediaRequest(mediaName, backupKey)
SignalNetwork.archive SignalNetwork.archive
.archiveAttachmentMedia( .copyAttachmentToArchive(
backupKey = backupKey, backupKey = backupKey,
aci = SignalStore.account.requireAci(), aci = SignalStore.account.requireAci(),
serviceCredential = credential, serviceCredential = credential,
@@ -658,7 +664,7 @@ object BackupRepository {
.also { Log.i(TAG, "archiveMediaResult: $it") } .also { Log.i(TAG, "archiveMediaResult: $it") }
} }
fun archiveMedia(databaseAttachments: List<DatabaseAttachment>): NetworkResult<BatchArchiveMediaResult> { fun copyAttachmentToArchive(databaseAttachments: List<DatabaseAttachment>): NetworkResult<BatchArchiveMediaResult> {
val backupKey = SignalStore.svr.getOrCreateMasterKey().deriveBackupKey() val backupKey = SignalStore.svr.getOrCreateMasterKey().deriveBackupKey()
return initBackupAndFetchAuth(backupKey) return initBackupAndFetchAuth(backupKey)
@@ -676,7 +682,7 @@ object BackupRepository {
} }
SignalNetwork.archive SignalNetwork.archive
.archiveAttachmentMedia( .copyAttachmentToArchive(
backupKey = backupKey, backupKey = backupKey,
aci = SignalStore.account.requireAci(), aci = SignalStore.account.requireAci(),
serviceCredential = credential, serviceCredential = credential,

View File

@@ -244,7 +244,7 @@ class InternalBackupPlaygroundViewModel : ViewModel() {
.filter { attachments.contains(it.dbAttachment.attachmentId) } .filter { attachments.contains(it.dbAttachment.attachmentId) }
.map { it.dbAttachment } .map { it.dbAttachment }
BackupRepository.archiveMedia(toArchive) BackupRepository.copyAttachmentToArchive(toArchive)
} }
.subscribeOn(Schedulers.io()) .subscribeOn(Schedulers.io())
.observeOn(Schedulers.single()) .observeOn(Schedulers.single())
@@ -268,7 +268,7 @@ class InternalBackupPlaygroundViewModel : ViewModel() {
} }
fun archiveAttachmentMedia(attachment: BackupAttachment) { fun archiveAttachmentMedia(attachment: BackupAttachment) {
disposables += Single.fromCallable { BackupRepository.archiveMedia(attachment.dbAttachment) } disposables += Single.fromCallable { BackupRepository.copyAttachmentToArchive(attachment.dbAttachment) }
.subscribeOn(Schedulers.io()) .subscribeOn(Schedulers.io())
.observeOn(Schedulers.single()) .observeOn(Schedulers.single())
.doOnSubscribe { _mediaState.set { update(inProgress = inProgressMediaIds + attachment.dbAttachment.attachmentId) } } .doOnSubscribe { _mediaState.set { update(inProgress = inProgressMediaIds + attachment.dbAttachment.attachmentId) } }

View File

@@ -539,6 +539,54 @@ class AttachmentTable(
.readToList { AttachmentId(it.requireLong(ID)) } .readToList { AttachmentId(it.requireLong(ID)) }
} }
/**
* At archive creation time, we need to ensure that all relevant attachments have populated (key, iv, digest) tuples.
* This does that.
*/
fun createKeyIvDigestForAttachmentsThatNeedArchiveUpload(): Int {
var count = 0
writableDatabase.select(ID, REMOTE_KEY, REMOTE_IV, REMOTE_DIGEST, DATA_FILE, DATA_RANDOM)
.from(TABLE_NAME)
.where(
"""
$ARCHIVE_TRANSFER_STATE = ${ArchiveTransferState.NONE.value} AND
$DATA_FILE NOT NULL AND
$TRANSFER_STATE = $TRANSFER_PROGRESS_DONE AND
(
$REMOTE_KEY IS NULL OR
$REMOTE_IV IS NULL OR
$REMOTE_DIGEST IS NULL
)
"""
)
.run()
.forEach { cursor ->
val attachmentId = AttachmentId(cursor.requireLong(ID))
Log.w(TAG, "[createKeyIvDigestForAttachmentsThatNeedArchiveUpload][$attachmentId] Missing key, iv, or digest. Generating.")
val key = cursor.requireString(REMOTE_KEY)?.let { Base64.decode(it) } ?: Util.getSecretBytes(64)
val iv = cursor.requireBlob(REMOTE_IV) ?: Util.getSecretBytes(16)
val digest = run {
val fileInfo = getDataFileInfo(attachmentId)!!
calculateDigest(fileInfo, key, iv)
}
writableDatabase.update(TABLE_NAME)
.values(
REMOTE_KEY to Base64.encodeWithPadding(key),
REMOTE_IV to iv,
REMOTE_DIGEST to digest
)
.where("$ID = ?", attachmentId.id)
.run()
count++
}
return count
}
/** /**
* Similar to [getAttachmentsThatNeedArchiveUpload], but returns if the list would be non-null in a more efficient way. * Similar to [getAttachmentsThatNeedArchiveUpload], but returns if the list would be non-null in a more efficient way.
*/ */
@@ -928,7 +976,7 @@ class AttachmentTable(
* @return True if we had to change the digest as part of saving the file, otherwise false. * @return True if we had to change the digest as part of saving the file, otherwise false.
*/ */
@Throws(MmsException::class) @Throws(MmsException::class)
fun finalizeAttachmentAfterDownload(mmsId: Long, attachmentId: AttachmentId, inputStream: LimitedInputStream, iv: ByteArray?): Boolean { fun finalizeAttachmentAfterDownload(mmsId: Long, attachmentId: AttachmentId, inputStream: LimitedInputStream, iv: ByteArray): Boolean {
Log.i(TAG, "[finalizeAttachmentAfterDownload] Finalizing downloaded data for $attachmentId. (MessageId: $mmsId, $attachmentId)") Log.i(TAG, "[finalizeAttachmentAfterDownload] Finalizing downloaded data for $attachmentId. (MessageId: $mmsId, $attachmentId)")
val existingPlaceholder: DatabaseAttachment = getAttachment(attachmentId) ?: throw MmsException("No attachment found for id: $attachmentId") val existingPlaceholder: DatabaseAttachment = getAttachment(attachmentId) ?: throw MmsException("No attachment found for id: $attachmentId")
@@ -947,12 +995,8 @@ class AttachmentTable(
} else { } else {
Log.w(TAG, "[finalizeAttachmentAfterDownload] $attachmentId has non-zero padding bytes. Recomputing digest.") Log.w(TAG, "[finalizeAttachmentAfterDownload] $attachmentId has non-zero padding bytes. Recomputing digest.")
val stream = PaddingInputStream(getDataStream(fileWriteResult.file, fileWriteResult.random, 0), fileWriteResult.length)
val key = Base64.decode(existingPlaceholder.remoteKey!!) val key = Base64.decode(existingPlaceholder.remoteKey!!)
val cipherOutputStream = AttachmentCipherOutputStream(key, iv, NullOutputStream) calculateDigest(fileWriteResult, key, iv)
StreamUtil.copy(stream, cipherOutputStream)
cipherOutputStream.transmittedDigest
} }
val digestChanged = !digest.contentEquals(existingPlaceholder.remoteDigest) val digestChanged = !digest.contentEquals(existingPlaceholder.remoteDigest)
@@ -1606,6 +1650,37 @@ class AttachmentTable(
notifyConversationListeners(threadId) notifyConversationListeners(threadId)
} }
/**
* This will ensure that a (key/iv/digest) tuple exists for an attachment, filling each one if necessary.
*/
@Throws(IOException::class)
fun createKeyIvDigestIfNecessary(attachment: DatabaseAttachment) {
if (attachment.remoteKey != null && attachment.remoteIv != null && attachment.remoteDigest != null) {
return
}
val attachmentId = attachment.attachmentId
Log.w(TAG, "[createKeyIvDigestIfNecessary][$attachmentId] Missing one of (key, iv, digest). Filling in the gaps.")
val key = attachment.remoteKey?.let { Base64.decode(it) } ?: Util.getSecretBytes(64)
val iv = attachment.remoteIv ?: Util.getSecretBytes(16)
val digest: ByteArray = run {
val fileInfo = getDataFileInfo(attachmentId) ?: throw IOException("No data file found for $attachmentId!")
calculateDigest(fileInfo, key, iv)
}
writableDatabase
.update(TABLE_NAME)
.values(
REMOTE_KEY to Base64.encodeWithPadding(key),
REMOTE_IV to iv,
REMOTE_DIGEST to digest
)
.where("$ID = ?", attachmentId.id)
.run()
}
fun getAttachments(cursor: Cursor): List<DatabaseAttachment> { fun getAttachments(cursor: Cursor): List<DatabaseAttachment> {
return try { return try {
if (cursor.getColumnIndex(ATTACHMENT_JSON_ALIAS) != -1) { if (cursor.getColumnIndex(ATTACHMENT_JSON_ALIAS) != -1) {
@@ -1775,6 +1850,22 @@ class AttachmentTable(
.run() .run()
} }
private fun calculateDigest(fileInfo: DataFileWriteResult, key: ByteArray, iv: ByteArray): ByteArray {
return calculateDigest(file = fileInfo.file, random = fileInfo.random, length = fileInfo.length, key = key, iv = iv)
}
private fun calculateDigest(fileInfo: DataFileInfo, key: ByteArray, iv: ByteArray): ByteArray {
return calculateDigest(file = fileInfo.file, random = fileInfo.random, length = fileInfo.length, key = key, iv = iv)
}
private fun calculateDigest(file: File, random: ByteArray, length: Long, key: ByteArray, iv: ByteArray): ByteArray {
val stream = PaddingInputStream(getDataStream(file, random, 0), length)
val cipherOutputStream = AttachmentCipherOutputStream(key, iv, NullOutputStream)
StreamUtil.copy(stream, cipherOutputStream)
return cipherOutputStream.transmittedDigest
}
/** /**
* Deletes the data file if there's no strong references to other attachments. * Deletes the data file if there's no strong references to other attachments.
* If deleted, it will also clear all weak references (i.e. quotes) of the attachment. * If deleted, it will also clear all weak references (i.e. quotes) of the attachment.

View File

@@ -42,6 +42,8 @@ class ArchiveAttachmentBackfillJob private constructor(parameters: Parameters) :
val jobs = SignalDatabase.attachments.getAttachmentsThatNeedArchiveUpload() val jobs = SignalDatabase.attachments.getAttachmentsThatNeedArchiveUpload()
.map { attachmentId -> UploadAttachmentToArchiveJob(attachmentId, forBackfill = true) } .map { attachmentId -> UploadAttachmentToArchiveJob(attachmentId, forBackfill = true) }
SignalDatabase.attachments.createKeyIvDigestForAttachmentsThatNeedArchiveUpload()
SignalStore.backup.totalAttachmentUploadCount = jobs.size.toLong() SignalStore.backup.totalAttachmentUploadCount = jobs.size.toLong()
SignalStore.backup.currentAttachmentUploadCount = 0 SignalStore.backup.currentAttachmentUploadCount = 0

View File

@@ -22,6 +22,7 @@ import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint
import org.thoughtcrime.securesms.jobs.protos.ArchiveThumbnailUploadJobData import org.thoughtcrime.securesms.jobs.protos.ArchiveThumbnailUploadJobData
import org.thoughtcrime.securesms.keyvalue.SignalStore import org.thoughtcrime.securesms.keyvalue.SignalStore
import org.thoughtcrime.securesms.mms.DecryptableStreamUriLoader.DecryptableUri import org.thoughtcrime.securesms.mms.DecryptableStreamUriLoader.DecryptableUri
import org.thoughtcrime.securesms.net.SignalNetwork
import org.thoughtcrime.securesms.util.ImageCompressionUtil import org.thoughtcrime.securesms.util.ImageCompressionUtil
import org.thoughtcrime.securesms.util.MediaUtil import org.thoughtcrime.securesms.util.MediaUtil
import org.whispersystems.signalservice.api.NetworkResult import org.whispersystems.signalservice.api.NetworkResult
@@ -93,13 +94,23 @@ class ArchiveThumbnailUploadJob private constructor(
val backupKey = SignalStore.svr.getOrCreateMasterKey().deriveBackupKey() val backupKey = SignalStore.svr.getOrCreateMasterKey().deriveBackupKey()
val resumableUpload = when (val result = BackupRepository.getMediaUploadSpec(secretKey = backupKey.deriveThumbnailTransitKey(attachment.getThumbnailMediaName()))) { val specResult = BackupRepository
.getAttachmentUploadForm()
.then { form ->
SignalNetwork.attachments.getResumableUploadSpec(
key = backupKey.deriveThumbnailTransitKey(attachment.getThumbnailMediaName()),
iv = attachment.remoteIv!!,
uploadForm = form
)
}
val resumableUpload = when (specResult) {
is NetworkResult.Success -> { is NetworkResult.Success -> {
Log.d(TAG, "Got an upload spec!") Log.d(TAG, "Got an upload spec!")
result.result.toProto() specResult.result.toProto()
} }
is NetworkResult.ApplicationError -> { is NetworkResult.ApplicationError -> {
Log.w(TAG, "Failed to get an upload spec due to an application error. Retrying.", result.throwable) Log.w(TAG, "Failed to get an upload spec due to an application error. Retrying.", specResult.throwable)
return Result.retry(defaultBackoff()) return Result.retry(defaultBackoff())
} }
is NetworkResult.NetworkError -> { is NetworkResult.NetworkError -> {
@@ -107,7 +118,7 @@ class ArchiveThumbnailUploadJob private constructor(
return Result.retry(defaultBackoff()) return Result.retry(defaultBackoff())
} }
is NetworkResult.StatusCodeError -> { is NetworkResult.StatusCodeError -> {
Log.w(TAG, "Failed to get an upload spec with status code ${result.code}") Log.w(TAG, "Failed to get an upload spec with status code ${specResult.code}")
return Result.retry(defaultBackoff()) return Result.retry(defaultBackoff())
} }
} }
@@ -125,7 +136,7 @@ class ArchiveThumbnailUploadJob private constructor(
val backupDirectories = BackupRepository.getCdnBackupDirectories().successOrThrow() val backupDirectories = BackupRepository.getCdnBackupDirectories().successOrThrow()
val mediaSecrets = backupKey.deriveMediaSecrets(attachment.getThumbnailMediaName()) val mediaSecrets = backupKey.deriveMediaSecrets(attachment.getThumbnailMediaName())
return when (val result = BackupRepository.archiveThumbnail(attachmentPointer, attachment)) { return when (val result = BackupRepository.copyThumbnailToArchive(attachmentPointer, attachment)) {
is NetworkResult.Success -> { is NetworkResult.Success -> {
// save attachment thumbnail // save attachment thumbnail
val archiveMediaId = attachment.archiveMediaId ?: backupKey.deriveMediaId(attachment.getMediaName()).encode() val archiveMediaId = attachment.archiveMediaId ?: backupKey.deriveMediaId(attachment.getMediaName()).encode()

View File

@@ -408,7 +408,7 @@ class AttachmentDownloadJob private constructor(
messageId, messageId,
attachmentId, attachmentId,
LimitedInputStream.withoutLimits((body.source() as Source).buffer().inputStream()), LimitedInputStream.withoutLimits((body.source() as Source).buffer().inputStream()),
iv = updatedAttachment.remoteIv iv = updatedAttachment.remoteIv!!
) )
} }
} }

View File

@@ -6,6 +6,7 @@
package org.thoughtcrime.securesms.jobs package org.thoughtcrime.securesms.jobs
import org.greenrobot.eventbus.EventBus import org.greenrobot.eventbus.EventBus
import org.signal.core.util.Stopwatch
import org.signal.core.util.logging.Log import org.signal.core.util.logging.Log
import org.thoughtcrime.securesms.backup.v2.BackupRepository import org.thoughtcrime.securesms.backup.v2.BackupRepository
import org.thoughtcrime.securesms.backup.v2.BackupV2Event import org.thoughtcrime.securesms.backup.v2.BackupV2Event
@@ -64,11 +65,17 @@ class BackupMessagesJob private constructor(parameters: Parameters) : Job(parame
override fun onFailure() = Unit override fun onFailure() = Unit
override fun run(): Result { override fun run(): Result {
val stopwatch = Stopwatch("BackupMessagesJob")
SignalDatabase.attachments.createKeyIvDigestForAttachmentsThatNeedArchiveUpload().takeIf { it > 0 }?.let { count -> Log.w(TAG, "Needed to create $count key/iv/digests.") }
stopwatch.split("key-iv-digest")
EventBus.getDefault().postSticky(BackupV2Event(type = BackupV2Event.Type.PROGRESS_MESSAGES, count = 0, estimatedTotalCount = 0)) EventBus.getDefault().postSticky(BackupV2Event(type = BackupV2Event.Type.PROGRESS_MESSAGES, count = 0, estimatedTotalCount = 0))
val tempBackupFile = BlobProvider.getInstance().forNonAutoEncryptingSingleSessionOnDisk(AppDependencies.application) val tempBackupFile = BlobProvider.getInstance().forNonAutoEncryptingSingleSessionOnDisk(AppDependencies.application)
val outputStream = FileOutputStream(tempBackupFile) val outputStream = FileOutputStream(tempBackupFile)
BackupRepository.export(outputStream = outputStream, append = { tempBackupFile.appendBytes(it) }, plaintext = false) BackupRepository.export(outputStream = outputStream, append = { tempBackupFile.appendBytes(it) }, plaintext = false)
stopwatch.split("export")
FileInputStream(tempBackupFile).use { FileInputStream(tempBackupFile).use {
when (val result = BackupRepository.uploadBackupFile(it, tempBackupFile.length())) { when (val result = BackupRepository.uploadBackupFile(it, tempBackupFile.length())) {
@@ -78,6 +85,7 @@ class BackupMessagesJob private constructor(parameters: Parameters) : Job(parame
is NetworkResult.ApplicationError -> throw result.throwable is NetworkResult.ApplicationError -> throw result.throwable
} }
} }
stopwatch.split("upload")
SignalStore.backup.lastBackupProtoSize = tempBackupFile.length() SignalStore.backup.lastBackupProtoSize = tempBackupFile.length()
if (!tempBackupFile.delete()) { if (!tempBackupFile.delete()) {
@@ -94,6 +102,8 @@ class BackupMessagesJob private constructor(parameters: Parameters) : Job(parame
} }
is NetworkResult.ApplicationError -> throw result.throwable is NetworkResult.ApplicationError -> throw result.throwable
} }
stopwatch.split("used-space")
stopwatch.stop(TAG)
if (SignalDatabase.attachments.doAnyAttachmentsNeedArchiveUpload()) { if (SignalDatabase.attachments.doAnyAttachmentsNeedArchiveUpload()) {
Log.i(TAG, "Enqueuing attachment backfill job.") Log.i(TAG, "Enqueuing attachment backfill job.")

View File

@@ -91,7 +91,7 @@ class CopyAttachmentToArchiveJob private constructor(private val attachmentId: A
return Result.success() return Result.success()
} }
val result = when (val archiveResult = BackupRepository.archiveMedia(attachment)) { val result = when (val archiveResult = BackupRepository.copyAttachmentToArchive(attachment)) {
is NetworkResult.Success -> { is NetworkResult.Success -> {
Log.i(TAG, "[$attachmentId] Successfully copied the archive tier.") Log.i(TAG, "[$attachmentId] Successfully copied the archive tier.")
Result.success() Result.success()

View File

@@ -113,7 +113,7 @@ public final class JobManagerFactories {
put(AttachmentCopyJob.KEY, new AttachmentCopyJob.Factory()); put(AttachmentCopyJob.KEY, new AttachmentCopyJob.Factory());
put(AttachmentDownloadJob.KEY, new AttachmentDownloadJob.Factory()); put(AttachmentDownloadJob.KEY, new AttachmentDownloadJob.Factory());
put(AttachmentHashBackfillJob.KEY, new AttachmentHashBackfillJob.Factory()); put(AttachmentHashBackfillJob.KEY, new AttachmentHashBackfillJob.Factory());
put(AttachmentMarkUploadedJob.KEY, new AttachmentMarkUploadedJob.Factory()); put(MarkNoteToSelfAttachmentUploadedJob.KEY, new MarkNoteToSelfAttachmentUploadedJob.Factory());
put(AttachmentUploadJob.KEY, new AttachmentUploadJob.Factory()); put(AttachmentUploadJob.KEY, new AttachmentUploadJob.Factory());
put(AutomaticSessionResetJob.KEY, new AutomaticSessionResetJob.Factory()); put(AutomaticSessionResetJob.KEY, new AutomaticSessionResetJob.Factory());
put(AvatarGroupsV1DownloadJob.KEY, new AvatarGroupsV1DownloadJob.Factory()); put(AvatarGroupsV1DownloadJob.KEY, new AvatarGroupsV1DownloadJob.Factory());

View File

@@ -11,18 +11,18 @@ import org.thoughtcrime.securesms.database.SignalDatabase;
import org.thoughtcrime.securesms.jobmanager.JsonJobData; import org.thoughtcrime.securesms.jobmanager.JsonJobData;
import org.thoughtcrime.securesms.jobmanager.Job; import org.thoughtcrime.securesms.jobmanager.Job;
import java.io.IOException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
* Only marks an attachment as uploaded. * Marks a note to self attachment (that didn't need to be uploaded, because there's no linked devices) as being uploaded for UX purposes.
* Also generates a key/iv/digest that otherwise wouldn't exist due to the lack of upload.
*/ */
public final class AttachmentMarkUploadedJob extends BaseJob { public final class MarkNoteToSelfAttachmentUploadedJob extends BaseJob {
public static final String KEY = "AttachmentMarkUploadedJob"; public static final String KEY = "AttachmentMarkUploadedJob";
@SuppressWarnings("unused") @SuppressWarnings("unused")
private static final String TAG = Log.tag(AttachmentMarkUploadedJob.class); private static final String TAG = Log.tag(MarkNoteToSelfAttachmentUploadedJob.class);
private static final String KEY_ATTACHMENT_ID = "row_id"; private static final String KEY_ATTACHMENT_ID = "row_id";
private static final String KEY_MESSAGE_ID = "message_id"; private static final String KEY_MESSAGE_ID = "message_id";
@@ -30,7 +30,7 @@ public final class AttachmentMarkUploadedJob extends BaseJob {
private final AttachmentId attachmentId; private final AttachmentId attachmentId;
private final long messageId; private final long messageId;
public AttachmentMarkUploadedJob(long messageId, @NonNull AttachmentId attachmentId) { public MarkNoteToSelfAttachmentUploadedJob(long messageId, @NonNull AttachmentId attachmentId) {
this(new Parameters.Builder() this(new Parameters.Builder()
.setLifespan(TimeUnit.DAYS.toMillis(1)) .setLifespan(TimeUnit.DAYS.toMillis(1))
.setMaxAttempts(Parameters.UNLIMITED) .setMaxAttempts(Parameters.UNLIMITED)
@@ -39,7 +39,7 @@ public final class AttachmentMarkUploadedJob extends BaseJob {
attachmentId); attachmentId);
} }
private AttachmentMarkUploadedJob(@NonNull Parameters parameters, long messageId, @NonNull AttachmentId attachmentId) { private MarkNoteToSelfAttachmentUploadedJob(@NonNull Parameters parameters, long messageId, @NonNull AttachmentId attachmentId) {
super(parameters); super(parameters);
this.attachmentId = attachmentId; this.attachmentId = attachmentId;
this.messageId = messageId; this.messageId = messageId;
@@ -59,14 +59,14 @@ public final class AttachmentMarkUploadedJob extends BaseJob {
@Override @Override
public void onRun() throws Exception { public void onRun() throws Exception {
AttachmentTable database = SignalDatabase.attachments(); DatabaseAttachment databaseAttachment = SignalDatabase.attachments().getAttachment(attachmentId);
DatabaseAttachment databaseAttachment = database.getAttachment(attachmentId);
if (databaseAttachment == null) { if (databaseAttachment == null) {
throw new InvalidAttachmentException("Cannot find the specified attachment."); throw new InvalidAttachmentException("Cannot find the specified attachment.");
} }
database.markAttachmentUploaded(messageId, databaseAttachment); SignalDatabase.attachments().markAttachmentUploaded(messageId, databaseAttachment);
SignalDatabase.attachments().createKeyIvDigestIfNecessary(databaseAttachment);
} }
@Override @Override
@@ -75,7 +75,7 @@ public final class AttachmentMarkUploadedJob extends BaseJob {
@Override @Override
protected boolean onShouldRetry(@NonNull Exception exception) { protected boolean onShouldRetry(@NonNull Exception exception) {
return exception instanceof IOException; return false;
} }
private class InvalidAttachmentException extends Exception { private class InvalidAttachmentException extends Exception {
@@ -84,14 +84,14 @@ public final class AttachmentMarkUploadedJob extends BaseJob {
} }
} }
public static final class Factory implements Job.Factory<AttachmentMarkUploadedJob> { public static final class Factory implements Job.Factory<MarkNoteToSelfAttachmentUploadedJob> {
@Override @Override
public @NonNull AttachmentMarkUploadedJob create(@NonNull Parameters parameters, @Nullable byte[] serializedData) { public @NonNull MarkNoteToSelfAttachmentUploadedJob create(@NonNull Parameters parameters, @Nullable byte[] serializedData) {
JsonJobData data = JsonJobData.deserialize(serializedData); JsonJobData data = JsonJobData.deserialize(serializedData);
return new AttachmentMarkUploadedJob(parameters, return new MarkNoteToSelfAttachmentUploadedJob(parameters,
data.getLong(KEY_MESSAGE_ID), data.getLong(KEY_MESSAGE_ID),
new AttachmentId(data.getLong(KEY_ATTACHMENT_ID))); new AttachmentId(data.getLong(KEY_ATTACHMENT_ID)));
} }
} }
} }

View File

@@ -5,6 +5,7 @@
package org.thoughtcrime.securesms.jobs package org.thoughtcrime.securesms.jobs
import org.signal.core.util.Base64
import org.signal.core.util.logging.Log import org.signal.core.util.logging.Log
import org.signal.protos.resumableuploads.ResumableUpload import org.signal.protos.resumableuploads.ResumableUpload
import org.thoughtcrime.securesms.attachments.AttachmentId import org.thoughtcrime.securesms.attachments.AttachmentId
@@ -100,6 +101,12 @@ class UploadAttachmentToArchiveJob private constructor(
return Result.success() return Result.success()
} }
if (attachment.remoteKey == null || attachment.remoteIv == null) {
Log.w(TAG, "[$attachmentId] Attachment is missing remote key or IV! Cannot upload.")
SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.NONE)
return Result.failure()
}
if (uploadSpec != null && System.currentTimeMillis() > uploadSpec!!.timeout) { if (uploadSpec != null && System.currentTimeMillis() > uploadSpec!!.timeout) {
Log.w(TAG, "[$attachmentId] Upload spec expired! Clearing.") Log.w(TAG, "[$attachmentId] Upload spec expired! Clearing.")
uploadSpec = null uploadSpec = null
@@ -108,7 +115,7 @@ class UploadAttachmentToArchiveJob private constructor(
if (uploadSpec == null) { if (uploadSpec == null) {
Log.d(TAG, "[$attachmentId] Need an upload spec. Fetching...") Log.d(TAG, "[$attachmentId] Need an upload spec. Fetching...")
val (spec, result) = fetchResumableUploadSpec() val (spec, result) = fetchResumableUploadSpec(key = Base64.decode(attachment.remoteKey), iv = attachment.remoteIv)
if (result != null) { if (result != null) {
return result return result
} }
@@ -154,15 +161,19 @@ class UploadAttachmentToArchiveJob private constructor(
override fun onFailure() = Unit override fun onFailure() = Unit
private fun fetchResumableUploadSpec(): Pair<ResumableUpload?, Result?> { private fun fetchResumableUploadSpec(key: ByteArray, iv: ByteArray): Pair<ResumableUpload?, Result?> {
return when (val spec = BackupRepository.getMediaUploadSpec()) { val uploadSpec = BackupRepository
.getAttachmentUploadForm()
.then { form -> SignalNetwork.attachments.getResumableUploadSpec(key, iv, form) }
return when (uploadSpec) {
is NetworkResult.Success -> { is NetworkResult.Success -> {
Log.d(TAG, "[$attachmentId] Got an upload spec!") Log.d(TAG, "[$attachmentId] Got an upload spec!")
spec.result.toProto() to null uploadSpec.result.toProto() to null
} }
is NetworkResult.ApplicationError -> { is NetworkResult.ApplicationError -> {
Log.w(TAG, "[$attachmentId] Failed to get an upload spec due to an application error. Retrying.", spec.throwable) Log.w(TAG, "[$attachmentId] Failed to get an upload spec due to an application error. Retrying.", uploadSpec.throwable)
return null to Result.retry(defaultBackoff()) return null to Result.retry(defaultBackoff())
} }
@@ -172,9 +183,9 @@ class UploadAttachmentToArchiveJob private constructor(
} }
is NetworkResult.StatusCodeError -> { is NetworkResult.StatusCodeError -> {
Log.w(TAG, "[$attachmentId] Failed request with status code ${spec.code}") Log.w(TAG, "[$attachmentId] Failed request with status code ${uploadSpec.code}")
when (ArchiveMediaUploadFormStatusCodes.from(spec.code)) { when (ArchiveMediaUploadFormStatusCodes.from(uploadSpec.code)) {
ArchiveMediaUploadFormStatusCodes.BadArguments, ArchiveMediaUploadFormStatusCodes.BadArguments,
ArchiveMediaUploadFormStatusCodes.InvalidPresentationOrSignature, ArchiveMediaUploadFormStatusCodes.InvalidPresentationOrSignature,
ArchiveMediaUploadFormStatusCodes.InsufficientPermissions, ArchiveMediaUploadFormStatusCodes.InsufficientPermissions,

View File

@@ -50,7 +50,7 @@ import org.thoughtcrime.securesms.jobmanager.Job;
import org.thoughtcrime.securesms.jobmanager.JobManager; import org.thoughtcrime.securesms.jobmanager.JobManager;
import org.thoughtcrime.securesms.jobs.AttachmentCompressionJob; import org.thoughtcrime.securesms.jobs.AttachmentCompressionJob;
import org.thoughtcrime.securesms.jobs.AttachmentCopyJob; import org.thoughtcrime.securesms.jobs.AttachmentCopyJob;
import org.thoughtcrime.securesms.jobs.AttachmentMarkUploadedJob; import org.thoughtcrime.securesms.jobs.MarkNoteToSelfAttachmentUploadedJob;
import org.thoughtcrime.securesms.jobs.AttachmentUploadJob; import org.thoughtcrime.securesms.jobs.AttachmentUploadJob;
import org.thoughtcrime.securesms.jobs.IndividualSendJob; import org.thoughtcrime.securesms.jobs.IndividualSendJob;
import org.thoughtcrime.securesms.jobs.ProfileKeySendJob; import org.thoughtcrime.securesms.jobs.ProfileKeySendJob;
@@ -643,9 +643,9 @@ public class MessageSender {
.map(a -> AttachmentCompressionJob.fromAttachment((DatabaseAttachment) a, false, -1)) .map(a -> AttachmentCompressionJob.fromAttachment((DatabaseAttachment) a, false, -1))
.toList(); .toList();
List<AttachmentMarkUploadedJob> fakeUploadJobs = Stream.of(attachments) List<MarkNoteToSelfAttachmentUploadedJob> fakeUploadJobs = Stream.of(attachments)
.map(a -> new AttachmentMarkUploadedJob(messageId, ((DatabaseAttachment) a).attachmentId)) .map(a -> new MarkNoteToSelfAttachmentUploadedJob(messageId, ((DatabaseAttachment) a).attachmentId))
.toList(); .toList();
AppDependencies.getJobManager().startChain(compressionJobs) AppDependencies.getJobManager().startChain(compressionJobs)
.then(fakeUploadJobs) .then(fakeUploadJobs)

View File

@@ -17,7 +17,6 @@ import org.whispersystems.signalservice.api.backup.BackupKey
import org.whispersystems.signalservice.api.push.ServiceId.ACI import org.whispersystems.signalservice.api.push.ServiceId.ACI
import org.whispersystems.signalservice.internal.push.AttachmentUploadForm import org.whispersystems.signalservice.internal.push.AttachmentUploadForm
import org.whispersystems.signalservice.internal.push.PushServiceSocket import org.whispersystems.signalservice.internal.push.PushServiceSocket
import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec
import java.io.InputStream import java.io.InputStream
import java.time.Instant import java.time.Instant
@@ -146,7 +145,11 @@ class ArchiveApi(private val pushServiceSocket: PushServiceSocket) {
/** /**
* Retrieves an [AttachmentUploadForm] that can be used to upload pre-existing media to the archive. * Retrieves an [AttachmentUploadForm] that can be used to upload pre-existing media to the archive.
* After uploading, the media still needs to be copied via [archiveAttachmentMedia]. *
* This is basically the same as [org.whispersystems.signalservice.api.attachment.AttachmentApi.getAttachmentV4UploadForm], but with a relaxed rate limit
* so we can request them more often (which is required for backfilling).
*
* After uploading, the media still needs to be copied via [copyAttachmentToArchive].
*/ */
fun getMediaUploadForm(backupKey: BackupKey, aci: ACI, serviceCredential: ArchiveServiceCredential): NetworkResult<AttachmentUploadForm> { fun getMediaUploadForm(backupKey: BackupKey, aci: ACI, serviceCredential: ArchiveServiceCredential): NetworkResult<AttachmentUploadForm> {
return NetworkResult.fromFetch { return NetworkResult.fromFetch {
@@ -156,16 +159,6 @@ class ArchiveApi(private val pushServiceSocket: PushServiceSocket) {
} }
} }
fun getResumableUploadSpec(uploadForm: AttachmentUploadForm, secretKey: ByteArray?): NetworkResult<ResumableUploadSpec> {
return NetworkResult.fromFetch {
if (secretKey == null) {
pushServiceSocket.getResumableUploadSpec(uploadForm)
} else {
pushServiceSocket.getResumableUploadSpecWithKey(uploadForm, secretKey)
}
}
}
/** /**
* Retrieves all media items in the user's archive. Note that this could be a very large number of items, making this only suitable for debugging. * Retrieves all media items in the user's archive. Note that this could be a very large number of items, making this only suitable for debugging.
* Use [getArchiveMediaItemsPage] in production. * Use [getArchiveMediaItemsPage] in production.
@@ -210,7 +203,7 @@ class ArchiveApi(private val pushServiceSocket: PushServiceSocket) {
* 413: No media space remaining * 413: No media space remaining
* 429: Rate-limited * 429: Rate-limited
*/ */
fun archiveAttachmentMedia( fun copyAttachmentToArchive(
backupKey: BackupKey, backupKey: BackupKey,
aci: ACI, aci: ACI,
serviceCredential: ArchiveServiceCredential, serviceCredential: ArchiveServiceCredential,
@@ -227,7 +220,7 @@ class ArchiveApi(private val pushServiceSocket: PushServiceSocket) {
/** /**
* Copy and re-encrypt media from the attachments cdn into the backup cdn. * Copy and re-encrypt media from the attachments cdn into the backup cdn.
*/ */
fun archiveAttachmentMedia( fun copyAttachmentToArchive(
backupKey: BackupKey, backupKey: BackupKey,
aci: ACI, aci: ACI,
serviceCredential: ArchiveServiceCredential, serviceCredential: ArchiveServiceCredential,