From ce87b50a07b9ebbf372d219ed7f7e3cc66cd2757 Mon Sep 17 00:00:00 2001 From: Cody Henthorne Date: Thu, 26 Mar 2026 16:35:02 -0400 Subject: [PATCH] Add create-and-upload to important attachment upload flows. Co-authored-by: Greyson Parrelli --- .../attachments/AttachmentUploadUtil.kt | 32 +++- .../securesms/backup/v2/BackupRepository.kt | 29 ++- .../jobs/ArchiveThumbnailUploadJob.kt | 78 ++++----- .../securesms/jobs/AttachmentUploadJob.kt | 68 +++++--- .../securesms/jobs/BackupMessagesJob.kt | 165 +++++++++--------- .../jobs/UploadAttachmentToArchiveJob.kt | 103 ++++++----- .../linkdevice/LinkDeviceRepository.kt | 24 +-- .../signalservice/api/archive/ArchiveApi.kt | 38 +++- .../api/attachment/AttachmentApi.kt | 122 ++++++++++--- .../api/crypto/AttachmentCipherStreamUtil.kt | 20 +++ .../internal/push/PushAttachmentData.kt | 2 +- .../internal/push/PushServiceSocket.java | 81 ++++++++- 12 files changed, 497 insertions(+), 265 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/attachments/AttachmentUploadUtil.kt b/app/src/main/java/org/thoughtcrime/securesms/attachments/AttachmentUploadUtil.kt index f6e4b1c495..08bf5d00ea 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/attachments/AttachmentUploadUtil.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/attachments/AttachmentUploadUtil.kt @@ -8,16 +8,19 @@ package org.thoughtcrime.securesms.attachments import android.content.Context import android.graphics.Bitmap import org.signal.blurhash.BlurHashEncoder +import org.signal.core.util.Base64 import org.signal.core.util.logging.Log import org.signal.core.util.mebiBytes -import org.signal.protos.resumableuploads.ResumableUpload import org.thoughtcrime.securesms.mms.PartAuthority import org.thoughtcrime.securesms.util.MediaUtil +import org.whispersystems.signalservice.api.crypto.AttachmentCipherStreamUtil import org.whispersystems.signalservice.api.messages.SignalServiceAttachment import org.whispersystems.signalservice.api.messages.SignalServiceAttachment.ProgressListener import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentStream -import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec +import org.whispersystems.signalservice.internal.crypto.PaddingInputStream import java.io.IOException +import java.io.InputStream +import java.security.MessageDigest import java.util.Objects /** @@ -32,6 +35,29 @@ object AttachmentUploadUtil { */ val FOREGROUND_LIMIT_BYTES: Long = 10.mebiBytes.inWholeBytes + /** + * Computes the base64-encoded SHA-256 checksum of the ciphertext that would result from encrypting [plaintextStream] + * with the given [key] and [iv], including padding, IV prefix, and HMAC suffix. + */ + fun computeCiphertextChecksum(key: ByteArray, iv: ByteArray, plaintextStream: InputStream, plaintextSize: Long): String { + val paddedStream = PaddingInputStream(plaintextStream, plaintextSize) + return Base64.encodeWithPadding(AttachmentCipherStreamUtil.computeCiphertextSha256(key, iv, paddedStream)) + } + + /** + * Computes the base64-encoded SHA-256 checksum of the raw bytes in [inputStream]. + * Used for pre-encrypted uploads where the data is already in its final form. + */ + fun computeRawChecksum(inputStream: InputStream): String { + val digest = MessageDigest.getInstance("SHA-256") + val buffer = ByteArray(16 * 1024) + var read: Int + while (inputStream.read(buffer).also { read = it } != -1) { + digest.update(buffer, 0, read) + } + return Base64.encodeWithPadding(digest.digest()) + } + /** * Builds a [SignalServiceAttachmentStream] from the provided data, which can then be provided to various upload methods. */ @@ -39,7 +65,6 @@ object AttachmentUploadUtil { fun buildSignalServiceAttachmentStream( context: Context, attachment: Attachment, - uploadSpec: ResumableUpload, cancellationSignal: (() -> Boolean)? = null, progressListener: ProgressListener? = null ): SignalServiceAttachmentStream { @@ -57,7 +82,6 @@ object AttachmentUploadUtil { .withHeight(attachment.height) .withUploadTimestamp(System.currentTimeMillis()) .withCaption(attachment.caption) - .withResumableUploadSpec(ResumableUploadSpec.from(uploadSpec)) .withCancelationSignal(cancellationSignal) .withListener(progressListener) .withUuid(attachment.uuid) diff --git a/app/src/main/java/org/thoughtcrime/securesms/backup/v2/BackupRepository.kt b/app/src/main/java/org/thoughtcrime/securesms/backup/v2/BackupRepository.kt index 0fed6e2325..abe2f1d56f 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/backup/v2/BackupRepository.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/backup/v2/BackupRepository.kt @@ -67,7 +67,6 @@ import org.signal.libsignal.zkgroup.VerificationFailedException import org.signal.libsignal.zkgroup.backups.BackupLevel import org.signal.libsignal.zkgroup.profiles.ProfileKey import org.thoughtcrime.securesms.R -import org.thoughtcrime.securesms.attachments.Attachment import org.thoughtcrime.securesms.attachments.AttachmentId import org.thoughtcrime.securesms.attachments.Cdn import org.thoughtcrime.securesms.attachments.DatabaseAttachment @@ -1649,6 +1648,13 @@ object BackupRepository { } } + fun getMessageBackupUploadForm(backupFileSize: Long): NetworkResult { + return initBackupAndFetchAuth() + .then { credential -> + SignalNetwork.archive.getMessageBackupUploadForm(SignalStore.account.requireAci(), credential.messageBackupAccess, backupFileSize) + } + } + fun downloadBackupFile(destination: File, listener: ProgressListener? = null): NetworkResult { return initBackupAndFetchAuth() .then { credential -> @@ -1688,7 +1694,6 @@ object BackupRepository { /** * Retrieves an [AttachmentUploadForm] that can be used to upload an attachment to the transit cdn. - * To continue the upload, use [org.whispersystems.signalservice.api.attachment.AttachmentApi.getResumableUploadSpec]. * * It's important to note that in order to get this to the archive cdn, you still need to use [copyAttachmentToArchive]. */ @@ -1726,10 +1731,10 @@ object BackupRepository { /** * Copies a thumbnail that has been uploaded to the transit cdn to the archive cdn. */ - fun copyThumbnailToArchive(thumbnailAttachment: Attachment, parentAttachment: DatabaseAttachment): NetworkResult { + fun copyThumbnailToArchive(thumbnail: UploadedThumbnailInfo, parentAttachment: DatabaseAttachment): NetworkResult { return initBackupAndFetchAuth() .then { credential -> - val request = thumbnailAttachment.toArchiveMediaRequest(parentAttachment.requireThumbnailMediaName(), credential.mediaBackupAccess.backupKey) + val request = buildArchiveMediaRequest(thumbnail.cdnNumber, thumbnail.remoteLocation, thumbnail.size, parentAttachment.requireThumbnailMediaName(), credential.mediaBackupAccess.backupKey) SignalNetwork.archive.copyAttachmentToArchive( aci = SignalStore.account.requireAci(), @@ -1746,7 +1751,7 @@ object BackupRepository { return initBackupAndFetchAuth() .then { credential -> val mediaName = attachment.requireMediaName() - val request = attachment.toArchiveMediaRequest(mediaName, credential.mediaBackupAccess.backupKey) + val request = buildArchiveMediaRequest(attachment.cdn.cdnNumber, attachment.remoteLocation!!, attachment.size, mediaName, credential.mediaBackupAccess.backupKey) SignalNetwork.archive .copyAttachmentToArchive( aci = SignalStore.account.requireAci(), @@ -2197,15 +2202,15 @@ object BackupRepository { val profileKey: ProfileKey ) - private fun Attachment.toArchiveMediaRequest(mediaName: MediaName, mediaRootBackupKey: MediaRootBackupKey): ArchiveMediaRequest { + private fun buildArchiveMediaRequest(cdnNumber: Int, remoteLocation: String, plaintextSize: Long, mediaName: MediaName, mediaRootBackupKey: MediaRootBackupKey): ArchiveMediaRequest { val mediaSecrets = mediaRootBackupKey.deriveMediaSecrets(mediaName) return ArchiveMediaRequest( sourceAttachment = ArchiveMediaRequest.SourceAttachment( - cdn = cdn.cdnNumber, - key = remoteLocation!! + cdn = cdnNumber, + key = remoteLocation ), - objectLength = AttachmentCipherStreamUtil.getCiphertextLength(PaddingInputStream.getPaddedSize(size)).toInt(), + objectLength = AttachmentCipherStreamUtil.getCiphertextLength(PaddingInputStream.getPaddedSize(plaintextSize)).toInt(), mediaId = mediaSecrets.id.encode(), hmacKey = Base64.encodeWithPadding(mediaSecrets.macKey), encryptionKey = Base64.encodeWithPadding(mediaSecrets.aesKey) @@ -2618,3 +2623,9 @@ class ArchiveMediaItemIterator(private val cursor: Cursor) : Iterator - SignalNetwork.attachments.getResumableUploadSpec( - key = mediaRootBackupKey.deriveThumbnailTransitKey(attachment.requireThumbnailMediaName()), - iv = Util.getSecretBytes(16), - uploadForm = form - ) - } - - if (isCanceled) { - ArchiveDatabaseExecutor.runBlocking { - SignalDatabase.attachments.setArchiveThumbnailTransferState(attachmentId, AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE) - } - return Result.failure() - } - - val resumableUpload = when (specResult) { - is NetworkResult.Success -> { - Log.d(TAG, "Got an upload spec!") - specResult.result.toProto() - } - + val form: AttachmentUploadForm = when (val formResult = BackupRepository.getAttachmentUploadForm()) { + is NetworkResult.Success -> formResult.result is NetworkResult.ApplicationError -> { - Log.w(TAG, "Failed to get an upload spec due to an application error. Retrying.", specResult.throwable) + Log.w(TAG, "Failed to get upload form due to an application error. Retrying.", formResult.throwable) return Result.retry(defaultBackoff()) } - is NetworkResult.NetworkError -> { - Log.w(TAG, "Encountered a transient network error when getting upload spec. Retrying.") + Log.w(TAG, "Encountered a transient network error when getting upload form. Retrying.") return Result.retry(defaultBackoff()) } - is NetworkResult.StatusCodeError -> { - return when (specResult.code) { + return when (formResult.code) { 429 -> { - Log.w(TAG, "Rate limited when getting upload spec.") - Result.retry(specResult.retryAfter()?.inWholeMilliseconds ?: defaultBackoff()) + Log.w(TAG, "Rate limited when getting upload form.") + Result.retry(formResult.retryAfter()?.inWholeMilliseconds ?: defaultBackoff()) } else -> { - Log.w(TAG, "Failed to get an upload spec with status code ${specResult.code}") + Log.w(TAG, "Failed to get upload form with status code ${formResult.code}") Result.retry(defaultBackoff()) } } @@ -232,13 +207,31 @@ class ArchiveThumbnailUploadJob private constructor( return Result.failure() } + val mediaRootBackupKey = SignalStore.backup.mediaRootBackupKey + val key = mediaRootBackupKey.deriveThumbnailTransitKey(attachment.requireThumbnailMediaName()) + val iv = Util.getSecretBytes(16) + + val checksumSha256 = ByteArrayInputStream(thumbnailResult.data).use { stream -> + AttachmentUploadUtil.computeCiphertextChecksum(key, iv, stream, thumbnailResult.data.size.toLong()) + } + val attachmentPointer = try { - buildSignalServiceAttachmentStream(thumbnailResult, resumableUpload).use { stream -> - val pointer = AppDependencies.signalServiceMessageSender.uploadAttachment(stream) - PointerAttachment.forPointer(Optional.of(pointer)).get() + val uploadResult: AttachmentUploadResult = buildSignalServiceAttachmentStream(thumbnailResult).use { stream -> + when (val result = SignalNetwork.attachments.uploadAttachmentV4(form, key, iv, checksumSha256, stream)) { + is NetworkResult.Success -> result.result + is NetworkResult.ApplicationError -> throw result.throwable + is NetworkResult.NetworkError -> throw result.exception + is NetworkResult.StatusCodeError -> throw IOException("Upload failed with status ${result.code}") + } } + + UploadedThumbnailInfo( + cdnNumber = uploadResult.cdnNumber, + remoteLocation = uploadResult.remoteId.toString(), + size = uploadResult.dataSize + ) } catch (e: IOException) { - Log.w(TAG, "Failed to upload attachment", e) + Log.w(TAG, "Failed to upload thumbnail", e) return Result.retry(defaultBackoff()) } @@ -336,7 +329,7 @@ class ArchiveThumbnailUploadJob private constructor( return result } - private fun buildSignalServiceAttachmentStream(result: ImageCompressionUtil.Result, uploadSpec: ResumableUpload): SignalServiceAttachmentStream { + private fun buildSignalServiceAttachmentStream(result: ImageCompressionUtil.Result): SignalServiceAttachmentStream { return SignalServiceAttachment.newStreamBuilder() .withStream(ByteArrayInputStream(result.data)) .withContentType(result.mimeType) @@ -344,7 +337,6 @@ class ArchiveThumbnailUploadJob private constructor( .withWidth(result.width) .withHeight(result.height) .withUploadTimestamp(System.currentTimeMillis()) - .withResumableUploadSpec(ResumableUploadSpec.from(uploadSpec)) .build() } diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentUploadJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentUploadJob.kt index eeeb6f8013..d950171421 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentUploadJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentUploadJob.kt @@ -28,6 +28,7 @@ import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec import org.thoughtcrime.securesms.jobs.protos.AttachmentUploadJobData import org.thoughtcrime.securesms.keyvalue.SignalStore +import org.thoughtcrime.securesms.mms.PartAuthority import org.thoughtcrime.securesms.net.NotPushRegisteredException import org.thoughtcrime.securesms.net.SignalNetwork import org.thoughtcrime.securesms.recipients.Recipient @@ -44,6 +45,7 @@ import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentStre import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulResumableUploadResponseCodeException import org.whispersystems.signalservice.api.push.exceptions.ResumeLocationInvalidException import org.whispersystems.signalservice.internal.crypto.PaddingInputStream +import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec import java.io.IOException import java.util.concurrent.TimeUnit import kotlin.time.Duration.Companion.days @@ -146,7 +148,7 @@ class AttachmentUploadJob private constructor( val timeSinceUpload = System.currentTimeMillis() - databaseAttachment.uploadTimestamp if (timeSinceUpload < UPLOAD_REUSE_THRESHOLD && !TextUtils.isEmpty(databaseAttachment.remoteLocation)) { - Log.i(TAG, "We can re-use an already-uploaded file. It was uploaded $timeSinceUpload ms (${timeSinceUpload.milliseconds.inRoundedDays()} days) ago. Skipping.") + Log.i(TAG, "[$attachmentId] We can re-use an already-uploaded file. It was uploaded $timeSinceUpload ms (${timeSinceUpload.milliseconds.inRoundedDays()} days) ago. Skipping.") SignalDatabase.attachments.setTransferState(databaseAttachment.mmsId, attachmentId, AttachmentTable.TRANSFER_PROGRESS_DONE) if (SignalStore.account.isPrimaryDevice && BackupRepository.shouldCopyAttachmentToArchive(databaseAttachment.attachmentId, databaseAttachment.mmsId)) { Log.i(TAG, "[$attachmentId] The re-used file was not copied to the archive. Copying now.") @@ -154,39 +156,50 @@ class AttachmentUploadJob private constructor( } return } else if (databaseAttachment.uploadTimestamp > 0) { - Log.i(TAG, "This file was previously-uploaded, but too long ago to be re-used. Age: $timeSinceUpload ms (${timeSinceUpload.milliseconds.inRoundedDays()} days)") + Log.i(TAG, "[$attachmentId] This file was previously-uploaded, but too long ago to be re-used. Age: $timeSinceUpload ms (${timeSinceUpload.milliseconds.inRoundedDays()} days)") if (databaseAttachment.archiveTransferState != AttachmentTable.ArchiveTransferState.NONE) { SignalDatabase.attachments.clearArchiveData(attachmentId) } } if (uploadSpec != null && System.currentTimeMillis() > uploadSpec!!.timeout) { - Log.w(TAG, "Upload spec expired! Clearing.") + Log.w(TAG, "[$attachmentId] Upload spec expired! Clearing.") uploadSpec = null } - if (uploadSpec == null) { - Log.d(TAG, "Need an upload spec. Fetching...") - uploadSpec = SignalNetwork.attachments - .getAttachmentV4UploadForm() - .then { form -> - SignalNetwork.attachments.getResumableUploadSpec( - key = Base64.decode(databaseAttachment.remoteKey!!), - iv = Util.getSecretBytes(16), - uploadForm = form - ) - } - .successOrThrow() - .toProto() - } else { - Log.d(TAG, "Re-using existing upload spec.") - } - - Log.i(TAG, "Uploading attachment for message " + databaseAttachment.mmsId + " with ID " + databaseAttachment.attachmentId) + Log.i(TAG, "[$attachmentId] Uploading attachment for message ${databaseAttachment.mmsId}") try { + val existingSpec = uploadSpec?.let { ResumableUploadSpec.from(it) } + + val uploadForm = if (existingSpec == null) { + SignalNetwork.attachments.getAttachmentV4UploadForm().successOrThrow() + } else { + null + } + + val key = existingSpec?.attachmentKey ?: Base64.decode(databaseAttachment.remoteKey!!) + val iv = existingSpec?.attachmentIv ?: Util.getSecretBytes(16) + + val checksumSha256 = if (existingSpec == null) { + PartAuthority.getAttachmentStream(context, databaseAttachment.uri!!).use { stream -> + AttachmentUploadUtil.computeCiphertextChecksum(key, iv, stream, databaseAttachment.size) + } + } else { + null + } + getAttachmentNotificationIfNeeded(databaseAttachment).use { notification -> - buildAttachmentStream(databaseAttachment, notification, uploadSpec!!).use { localAttachment -> - val uploadResult: AttachmentUploadResult = SignalNetwork.attachments.uploadAttachmentV4(localAttachment).successOrThrow() + buildAttachmentStream(databaseAttachment, notification).use { localAttachment -> + val uploadResult: AttachmentUploadResult = SignalNetwork.attachments.uploadAttachmentV4( + form = uploadForm, + key = key, + iv = iv, + checksumSha256 = checksumSha256, + attachmentStream = localAttachment, + existingSpec = existingSpec, + onSpecCreated = { spec -> uploadSpec = spec.toProto() } + ).successOrThrow() + SignalDatabase.attachments.finalizeAttachmentAfterUpload(databaseAttachment.attachmentId, uploadResult) if (SignalStore.backup.backsUpMedia) { val messageId = SignalDatabase.attachments.getMessageId(databaseAttachment.attachmentId) @@ -235,7 +248,7 @@ class AttachmentUploadJob private constructor( throw e } catch (e: NonSuccessfulResumableUploadResponseCodeException) { if (e.code == 400) { - Log.w(TAG, "Failed to upload due to a 400 when getting resumable upload information. Clearing upload spec.", e) + Log.w(TAG, "[$attachmentId] Failed to upload due to a 400 when getting resumable upload information. Clearing upload spec.", e) uploadSpec = null } @@ -243,7 +256,7 @@ class AttachmentUploadJob private constructor( throw e } catch (e: ResumeLocationInvalidException) { - Log.w(TAG, "Resume location invalid. Clearing upload spec.", e) + Log.w(TAG, "[$attachmentId] Resume location invalid. Clearing upload spec.", e) uploadSpec = null resetProgressListeners(databaseAttachment) @@ -268,7 +281,7 @@ class AttachmentUploadJob private constructor( val database = SignalDatabase.attachments val databaseAttachment = database.getAttachment(attachmentId) if (databaseAttachment == null) { - Log.i(TAG, "Could not find attachment in DB for upload job upon failure/cancellation.") + Log.i(TAG, "[$attachmentId] Could not find attachment in DB for upload job upon failure/cancellation.") return } @@ -280,7 +293,7 @@ class AttachmentUploadJob private constructor( } @Throws(InvalidAttachmentException::class) - private fun buildAttachmentStream(attachment: Attachment, notification: AttachmentProgressService.Controller?, resumableUploadSpec: ResumableUpload): SignalServiceAttachmentStream { + private fun buildAttachmentStream(attachment: Attachment, notification: AttachmentProgressService.Controller?): SignalServiceAttachmentStream { if (attachment.uri == null || attachment.size == 0L) { throw InvalidAttachmentException(IOException("Outgoing attachment has no data!")) } @@ -289,7 +302,6 @@ class AttachmentUploadJob private constructor( AttachmentUploadUtil.buildSignalServiceAttachmentStream( context = context, attachment = attachment, - uploadSpec = resumableUploadSpec, cancellationSignal = { isCanceled }, progressListener = object : SignalServiceAttachment.ProgressListener { override fun onAttachmentProgress(progress: AttachmentTransferProgress) { diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupMessagesJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupMessagesJob.kt index e25d31fa30..5d7d1951ba 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupMessagesJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupMessagesJob.kt @@ -25,6 +25,7 @@ import org.signal.libsignal.net.SvrBStoreResponse import org.signal.libsignal.zkgroup.VerificationFailedException import org.signal.protos.resumableuploads.ResumableUpload import org.thoughtcrime.securesms.R +import org.thoughtcrime.securesms.attachments.AttachmentUploadUtil import org.thoughtcrime.securesms.backup.ArchiveUploadProgress import org.thoughtcrime.securesms.backup.v2.ArchiveRestoreProgress import org.thoughtcrime.securesms.backup.v2.ArchiveValidator @@ -294,49 +295,45 @@ class BackupMessagesJob private constructor( this.syncTime = currentTime this.dataFile = tempBackupFile.path - val backupSpec: ResumableMessagesBackupUploadSpec = resumableMessagesBackupUploadSpec ?: when (val result = BackupRepository.getResumableMessagesBackupUploadSpec(tempBackupFile.length())) { - is NetworkResult.Success -> { - Log.i(TAG, "Successfully generated a new upload spec.", true) + val existingSpec = resumableMessagesBackupUploadSpec + val form: AttachmentUploadForm = if (existingSpec == null) { + when (val result = BackupRepository.getMessageBackupUploadForm(tempBackupFile.length())) { + is NetworkResult.Success -> result.result + is NetworkResult.NetworkError -> { + Log.i(TAG, "Network failure", result.getCause(), true) + return Result.retry(defaultBackoff()) + } + is NetworkResult.StatusCodeError -> { + when (result.code) { + 413 -> { + Log.i(TAG, "Backup file is too large! Size: ${tempBackupFile.length()} bytes. Current threshold: ${SignalStore.backup.messageCuttoffDuration}", result.getCause(), true) + tempBackupFile.delete() + this.dataFile = "" + BackupRepository.markBackupCreationFailed(BackupValues.BackupCreationError.BACKUP_FILE_TOO_LARGE) + backupErrorHandled = true - val spec = result.result - resumableMessagesBackupUploadSpec = spec - spec - } - - is NetworkResult.NetworkError -> { - Log.i(TAG, "Network failure", result.getCause(), true) - return Result.retry(defaultBackoff()) - } - - is NetworkResult.StatusCodeError -> { - when (result.code) { - 413 -> { - Log.i(TAG, "Backup file is too large! Size: ${tempBackupFile.length()} bytes. Current threshold: ${SignalStore.backup.messageCuttoffDuration}", result.getCause(), true) - tempBackupFile.delete() - this.dataFile = "" - BackupRepository.markBackupCreationFailed(BackupValues.BackupCreationError.BACKUP_FILE_TOO_LARGE) - backupErrorHandled = true - - if (SignalStore.backup.messageCuttoffDuration == null) { - Log.i(TAG, "Setting message cuttoff duration to $TOO_LARGE_MESSAGE_CUTTOFF_DURATION", true) - SignalStore.backup.messageCuttoffDuration = TOO_LARGE_MESSAGE_CUTTOFF_DURATION + if (SignalStore.backup.messageCuttoffDuration == null) { + Log.i(TAG, "Setting message cuttoff duration to $TOO_LARGE_MESSAGE_CUTTOFF_DURATION", true) + SignalStore.backup.messageCuttoffDuration = TOO_LARGE_MESSAGE_CUTTOFF_DURATION + return Result.retry(defaultBackoff()) + } else { + return Result.failure() + } + } + 429 -> { + Log.i(TAG, "Rate limited when getting upload form.", result.getCause(), true) + return Result.retry(result.retryAfter()?.inWholeMilliseconds ?: defaultBackoff()) + } + else -> { + Log.i(TAG, "Status code failure", result.getCause(), true) return Result.retry(defaultBackoff()) - } else { - return Result.failure() } } - 429 -> { - Log.i(TAG, "Rate limited when getting upload spec.", result.getCause(), true) - return Result.retry(result.retryAfter()?.inWholeMilliseconds ?: defaultBackoff()) - } - else -> { - Log.i(TAG, "Status code failure", result.getCause(), true) - return Result.retry(defaultBackoff()) - } } + is NetworkResult.ApplicationError -> throw result.throwable } - - is NetworkResult.ApplicationError -> throw result.throwable + } else { + existingSpec.attachmentUploadForm } val progressListener = object : SignalServiceAttachment.ProgressListener { @@ -347,56 +344,58 @@ class BackupMessagesJob private constructor( override fun shouldCancel(): Boolean = isCanceled } - FileInputStream(tempBackupFile).use { fileStream -> - val uploadResult = SignalNetwork.archive.uploadBackupFile( - uploadForm = backupSpec.attachmentUploadForm, - resumableUploadUrl = backupSpec.resumableUri, + val checksumSha256 = if (existingSpec == null) { + FileInputStream(tempBackupFile).use { AttachmentUploadUtil.computeRawChecksum(it) } + } else { + null + } + + val uploadResult = FileInputStream(tempBackupFile).use { fileStream -> + SignalNetwork.archive.uploadBackupFile( + uploadForm = form, data = fileStream, dataLength = tempBackupFile.length(), - progressListener = progressListener + checksumSha256 = checksumSha256, + progressListener = progressListener, + existingResumeUrl = existingSpec?.resumableUri, + onResumeUrlCreated = { url -> + resumableMessagesBackupUploadSpec = ResumableMessagesBackupUploadSpec(attachmentUploadForm = form, resumableUri = url) + } ) - - when (uploadResult) { - is NetworkResult.Success -> { - Log.i(TAG, "Successfully uploaded backup file.", true) - if (!SignalStore.backup.hasBackupBeenUploaded) { - Log.i(TAG, "First time making a backup - scheduling a storage sync.", true) - SignalDatabase.recipients.markNeedsSync(Recipient.self().id) - StorageSyncHelper.scheduleSyncForDataChange() - } - SignalStore.backup.hasBackupBeenUploaded = true - } - - is NetworkResult.NetworkError -> { - Log.i(TAG, "Network failure", uploadResult.getCause(), true) - return if (isCanceled) { - Result.failure() - } else { - Result.retry(defaultBackoff()) - } - } - - is NetworkResult.StatusCodeError -> { - when (uploadResult.code) { - 400 -> { - Log.w(TAG, "400 likely means bad resumable state. Resetting the upload spec before retrying.", true) - resumableMessagesBackupUploadSpec = null - return Result.retry(defaultBackoff()) - } - 429 -> { - Log.w(TAG, "Rate limited when uploading backup file.", uploadResult.getCause(), true) - return Result.retry(uploadResult.retryAfter()?.inWholeMilliseconds ?: defaultBackoff()) - } - else -> { - Log.i(TAG, "Status code failure (${uploadResult.code})", uploadResult.getCause(), true) - return Result.retry(defaultBackoff()) - } - } - } - - is NetworkResult.ApplicationError -> throw uploadResult.throwable - } } + when (uploadResult) { + is NetworkResult.Success -> Unit + is NetworkResult.NetworkError -> { + Log.i(TAG, "Network failure", uploadResult.getCause(), true) + return if (isCanceled) Result.failure() else Result.retry(defaultBackoff()) + } + is NetworkResult.StatusCodeError -> { + when (uploadResult.code) { + 400 -> { + Log.w(TAG, "400 likely means bad resumable state. Resetting the upload spec before retrying.", true) + resumableMessagesBackupUploadSpec = null + return Result.retry(defaultBackoff()) + } + 429 -> { + Log.w(TAG, "Rate limited when uploading backup file.", uploadResult.getCause(), true) + return Result.retry(uploadResult.retryAfter()?.inWholeMilliseconds ?: defaultBackoff()) + } + else -> { + Log.i(TAG, "Status code failure (${uploadResult.code})", uploadResult.getCause(), true) + return Result.retry(defaultBackoff()) + } + } + } + is NetworkResult.ApplicationError -> throw uploadResult.throwable + } + + Log.i(TAG, "Successfully uploaded backup file.", true) + if (!SignalStore.backup.hasBackupBeenUploaded) { + Log.i(TAG, "First time making a backup - scheduling a storage sync.", true) + SignalDatabase.recipients.markNeedsSync(Recipient.self().id) + StorageSyncHelper.scheduleSyncForDataChange() + } + SignalStore.backup.hasBackupBeenUploaded = true stopwatch.split("upload") SignalStore.backup.nextBackupSecretData = svrBMetadata.nextBackupSecretData diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt index 1dc693fb12..38a1393578 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt @@ -28,6 +28,7 @@ import org.thoughtcrime.securesms.jobmanager.Job import org.thoughtcrime.securesms.jobmanager.impl.BackupMessagesConstraint import org.thoughtcrime.securesms.jobs.protos.UploadAttachmentToArchiveJobData import org.thoughtcrime.securesms.keyvalue.SignalStore +import org.thoughtcrime.securesms.mms.PartAuthority import org.thoughtcrime.securesms.net.SignalNetwork import org.thoughtcrime.securesms.service.AttachmentProgressService import org.thoughtcrime.securesms.util.MediaUtil @@ -37,6 +38,8 @@ import org.whispersystems.signalservice.api.archive.ArchiveMediaUploadFormStatus import org.whispersystems.signalservice.api.attachment.AttachmentUploadResult import org.whispersystems.signalservice.api.messages.AttachmentTransferProgress import org.whispersystems.signalservice.api.messages.SignalServiceAttachment +import org.whispersystems.signalservice.internal.push.AttachmentUploadForm +import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec import java.io.FileNotFoundException import java.io.IOException import java.net.ProtocolException @@ -224,17 +227,43 @@ class UploadAttachmentToArchiveJob private constructor( uploadSpec = null } - if (uploadSpec == null) { - Log.d(TAG, "[$attachmentId]$mediaIdLog Need an upload spec. Fetching...") + val existingSpec = uploadSpec?.let { ResumableUploadSpec.from(it) } - val (spec, result) = fetchResumableUploadSpec(key = Base64.decode(attachment.remoteKey), iv = Util.getSecretBytes(16)) - if (result != null) { - return result + val form: AttachmentUploadForm? = if (existingSpec == null) { + when (val formResult = BackupRepository.getAttachmentUploadForm()) { + is NetworkResult.Success -> formResult.result + is NetworkResult.ApplicationError -> { + Log.w(TAG, "[$attachmentId]$mediaIdLog Failed to get upload form due to an application error.", formResult.throwable) + return Result.retry(defaultBackoff()) + } + is NetworkResult.NetworkError -> { + Log.w(TAG, "[$attachmentId]$mediaIdLog Encountered a transient network error getting upload form.") + return Result.retry(defaultBackoff()) + } + is NetworkResult.StatusCodeError -> { + Log.w(TAG, "[$attachmentId]$mediaIdLog Failed to get upload form with status code ${formResult.code}") + return when (ArchiveMediaUploadFormStatusCodes.from(formResult.code)) { + ArchiveMediaUploadFormStatusCodes.RateLimited -> { + Log.w(TAG, "[$attachmentId]$mediaIdLog Rate limited when getting upload form.") + Result.retry(formResult.retryAfter()?.inWholeMilliseconds ?: defaultBackoff()) + } + else -> Result.retry(defaultBackoff()) + } + } } - - uploadSpec = spec } else { - Log.d(TAG, "[$attachmentId]$mediaIdLog Already have an upload spec. Continuing...") + null + } + + val key = existingSpec?.attachmentKey ?: Base64.decode(attachment.remoteKey!!) + val iv = existingSpec?.attachmentIv ?: Util.getSecretBytes(16) + + val checksumSha256 = if (existingSpec == null) { + PartAuthority.getAttachmentStream(context, attachment.uri!!).use { stream -> + AttachmentUploadUtil.computeCiphertextChecksum(key, iv, stream, attachment.size) + } + } else { + null } val progressServiceController = if (attachment.size >= AttachmentUploadUtil.FOREGROUND_LIMIT_BYTES) { @@ -249,7 +278,6 @@ class UploadAttachmentToArchiveJob private constructor( AttachmentUploadUtil.buildSignalServiceAttachmentStream( context = context, attachment = attachment, - uploadSpec = uploadSpec!!, cancellationSignal = { this.isCanceled }, progressListener = object : SignalServiceAttachment.ProgressListener { override fun onAttachmentProgress(progress: AttachmentTransferProgress) { @@ -273,8 +301,18 @@ class UploadAttachmentToArchiveJob private constructor( Log.d(TAG, "[$attachmentId]$mediaIdLog Beginning upload...") progressServiceController.use { - val uploadResult: AttachmentUploadResult = attachmentStream.use { managedAttachmentStream -> - when (val result = SignalNetwork.attachments.uploadAttachmentV4(managedAttachmentStream)) { + val uploadResult: AttachmentUploadResult = attachmentStream.use { stream -> + when ( + val result = SignalNetwork.attachments.uploadAttachmentV4( + form = form, + key = key, + iv = iv, + checksumSha256 = checksumSha256, + attachmentStream = stream, + existingSpec = existingSpec, + onSpecCreated = { spec -> uploadSpec = spec.toProto() } + ) + ) { is NetworkResult.Success -> result.result is NetworkResult.ApplicationError -> throw result.throwable is NetworkResult.NetworkError -> { @@ -348,49 +386,6 @@ class UploadAttachmentToArchiveJob private constructor( } } - private fun fetchResumableUploadSpec(key: ByteArray, iv: ByteArray): Pair { - val uploadSpec = BackupRepository - .getAttachmentUploadForm() - .then { form -> SignalNetwork.attachments.getResumableUploadSpec(key, iv, form) } - - return when (uploadSpec) { - is NetworkResult.Success -> { - Log.d(TAG, "[$attachmentId] Got an upload spec!") - uploadSpec.result.toProto() to null - } - - is NetworkResult.ApplicationError -> { - Log.w(TAG, "[$attachmentId] Failed to get an upload spec due to an application error. Retrying.", uploadSpec.throwable) - return null to Result.retry(defaultBackoff()) - } - - is NetworkResult.NetworkError -> { - Log.w(TAG, "[$attachmentId] Encountered a transient network error. Retrying.") - return null to Result.retry(defaultBackoff()) - } - - is NetworkResult.StatusCodeError -> { - Log.w(TAG, "[$attachmentId] Failed request with status code ${uploadSpec.code}") - - when (ArchiveMediaUploadFormStatusCodes.from(uploadSpec.code)) { - ArchiveMediaUploadFormStatusCodes.BadArguments, - ArchiveMediaUploadFormStatusCodes.InvalidPresentationOrSignature, - ArchiveMediaUploadFormStatusCodes.InsufficientPermissions -> { - return null to Result.retry(defaultBackoff()) - } - ArchiveMediaUploadFormStatusCodes.RateLimited -> { - Log.w(TAG, "[$attachmentId] Rate limited when getting upload form.") - return null to Result.retry(uploadSpec.retryAfter()?.inWholeMilliseconds ?: defaultBackoff()) - } - - ArchiveMediaUploadFormStatusCodes.Unknown -> { - return null to Result.retry(defaultBackoff()) - } - } - } - } - } - private fun setArchiveTransferStateWithDelayedNotification(attachmentId: AttachmentId, transferState: AttachmentTable.ArchiveTransferState) { SignalDatabase.attachments.setArchiveTransferState(attachmentId, transferState, notify = false) ArchiveDatabaseExecutor.throttledNotifyAttachmentObservers() diff --git a/app/src/main/java/org/thoughtcrime/securesms/linkdevice/LinkDeviceRepository.kt b/app/src/main/java/org/thoughtcrime/securesms/linkdevice/LinkDeviceRepository.kt index 9ee5256541..b5e7ae3eb2 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/linkdevice/LinkDeviceRepository.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/linkdevice/LinkDeviceRepository.kt @@ -12,6 +12,7 @@ import org.signal.core.util.logging.logW import org.signal.core.util.toByteArray import org.signal.libsignal.protocol.InvalidKeyException import org.signal.libsignal.protocol.ecc.ECPublicKey +import org.thoughtcrime.securesms.attachments.AttachmentUploadUtil import org.thoughtcrime.securesms.backup.BackupFileIOError import org.thoughtcrime.securesms.backup.v2.ArchiveValidator import org.thoughtcrime.securesms.backup.v2.BackupRepository @@ -22,6 +23,7 @@ import org.thoughtcrime.securesms.jobs.DeviceNameChangeJob import org.thoughtcrime.securesms.jobs.E164FormattingJob import org.thoughtcrime.securesms.jobs.LinkedDeviceInactiveCheckJob import org.thoughtcrime.securesms.keyvalue.SignalStore +import org.thoughtcrime.securesms.linkdevice.LinkDeviceRepository.createAndUploadArchive import org.thoughtcrime.securesms.net.SignalNetwork import org.thoughtcrime.securesms.providers.BlobProvider import org.thoughtcrime.securesms.registration.secondary.DeviceNameCipher @@ -393,23 +395,25 @@ object LinkDeviceRepository { * Handles uploading the archive for [createAndUploadArchive]. Handles resumable uploads and making multiple upload attempts. */ private fun uploadArchive(backupFile: File, uploadForm: AttachmentUploadForm): NetworkResult { - val resumableUploadUrl = when (val result = NetworkResult.withRetry { SignalNetwork.attachments.getResumableUploadUrl(uploadForm) }) { - is NetworkResult.Success -> result.result - is NetworkResult.NetworkError -> return result.map { Unit }.logW(TAG, "Network error when fetching upload URL.", result.exception) - is NetworkResult.StatusCodeError -> return result.map { Unit }.logW(TAG, "Status code error when fetching upload URL.", result.exception) - is NetworkResult.ApplicationError -> throw result.throwable - } + val checksumSha256 = FileInputStream(backupFile).use { AttachmentUploadUtil.computeRawChecksum(it) } + var resumeUrl: String? = null val uploadResult = NetworkResult.withRetry( logAttempt = { attempt, maxAttempts -> Log.i(TAG, "Starting upload attempt ${attempt + 1}/$maxAttempts") } ) { FileInputStream(backupFile).use { - SignalNetwork.attachments.uploadPreEncryptedFileToAttachmentV4( + val result = SignalNetwork.archive.uploadBackupFile( uploadForm = uploadForm, - resumableUploadUrl = resumableUploadUrl, - inputStream = it, - inputStreamLength = backupFile.length() + data = it, + dataLength = backupFile.length(), + checksumSha256 = checksumSha256, + existingResumeUrl = resumeUrl, + onResumeUrlCreated = { url -> resumeUrl = url } ) + if (result !is NetworkResult.Success) { + resumeUrl = null + } + result } } diff --git a/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/archive/ArchiveApi.kt b/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/archive/ArchiveApi.kt index 06b96361ed..fea4048bdc 100644 --- a/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/archive/ArchiveApi.kt +++ b/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/archive/ArchiveApi.kt @@ -10,6 +10,7 @@ import org.signal.core.models.backup.BackupKey import org.signal.core.models.backup.MediaRootBackupKey import org.signal.core.models.backup.MessageBackupKey import org.signal.core.util.isNotNullOrBlank +import org.signal.core.util.logging.Log import org.signal.libsignal.protocol.ecc.ECPrivateKey import org.signal.libsignal.protocol.ecc.ECPublicKey import org.signal.libsignal.zkgroup.GenericServerPublicParams @@ -44,6 +45,10 @@ class ArchiveApi( private val pushServiceSocket: PushServiceSocket ) { + companion object { + private val TAG = Log.tag(ArchiveApi::class) + } + private val backupServerPublicParams: GenericServerPublicParams = GenericServerPublicParams(pushServiceSocket.configuration.backupServerPublicParams) /** @@ -236,11 +241,38 @@ class ArchiveApi( } /** - * Uploads your main backup file to cloud storage. + * Uploads a pre-encrypted backup file, automatically choosing the best upload strategy based on CDN version. + * For CDN3, uses TUS "Creation With Upload" (single POST). For other CDNs, falls back to the legacy + * resumable upload flow. + * + * If [existingResumeUrl] is provided, the upload resumes using the existing URL (HEAD+PATCH). + * Otherwise, a new upload is initiated and [onResumeUrlCreated] is called with the resumable URL + * before the upload begins, allowing callers to persist it for crash recovery. */ - fun uploadBackupFile(uploadForm: AttachmentUploadForm, resumableUploadUrl: String, data: InputStream, dataLength: Long, progressListener: SignalServiceAttachment.ProgressListener? = null): NetworkResult { + fun uploadBackupFile( + uploadForm: AttachmentUploadForm, + data: InputStream, + dataLength: Long, + checksumSha256: String? = null, + progressListener: SignalServiceAttachment.ProgressListener? = null, + existingResumeUrl: String? = null, + onResumeUrlCreated: ((String) -> Unit)? = null + ): NetworkResult { return NetworkResult.fromFetch { - pushServiceSocket.uploadBackupFile(uploadForm, resumableUploadUrl, data, dataLength, progressListener) + if (existingResumeUrl != null) { + Log.i(TAG, "Resuming backup upload via HEAD+PATCH") + pushServiceSocket.uploadBackupFile(uploadForm, existingResumeUrl, data, dataLength, progressListener) + } else if (uploadForm.cdn == 3) { + Log.i(TAG, "Fresh backup upload via creation-with-upload (CDN3)") + val resumeUrl = uploadForm.signedUploadLocation + "/" + uploadForm.key + onResumeUrlCreated?.invoke(resumeUrl) + pushServiceSocket.uploadBackupFile(uploadForm, checksumSha256, data, dataLength, progressListener, null) + } else { + Log.i(TAG, "Fresh backup upload via legacy flow (CDN${uploadForm.cdn})") + val resumeUrl = pushServiceSocket.getResumableUploadUrl(uploadForm, checksumSha256) + onResumeUrlCreated?.invoke(resumeUrl) + pushServiceSocket.uploadBackupFile(uploadForm, resumeUrl, data, dataLength, progressListener) + } } } diff --git a/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/attachment/AttachmentApi.kt b/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/attachment/AttachmentApi.kt index bea50827db..4e58bb972a 100644 --- a/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/attachment/AttachmentApi.kt +++ b/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/attachment/AttachmentApi.kt @@ -5,6 +5,7 @@ package org.whispersystems.signalservice.api.attachment +import org.signal.core.util.logging.Log import org.whispersystems.signalservice.api.NetworkResult import org.whispersystems.signalservice.api.crypto.AttachmentCipherStreamUtil import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentRemoteId @@ -28,6 +29,11 @@ class AttachmentApi( private val authWebSocket: SignalWebSocket.AuthenticatedWebSocket, private val pushServiceSocket: PushServiceSocket ) { + + companion object { + private val TAG: String = Log.tag(AttachmentApi::class) + } + /** * Gets a v4 attachment upload form, which provides the necessary information to upload an attachment. * @@ -41,24 +47,6 @@ class AttachmentApi( return NetworkResult.fromWebSocketRequest(authWebSocket, request, AttachmentUploadForm::class) } - /** - * Gets a resumable upload spec, which can be saved and re-used across upload attempts to resume upload progress. - */ - fun getResumableUploadSpec(key: ByteArray, iv: ByteArray, uploadForm: AttachmentUploadForm): NetworkResult { - return getResumableUploadUrl(uploadForm) - .map { url -> - ResumableUploadSpec( - attachmentKey = key, - attachmentIv = iv, - cdnKey = uploadForm.key, - cdnNumber = uploadForm.cdn, - resumeLocation = url, - expirationTimestamp = System.currentTimeMillis() + PushServiceSocket.CDN2_RESUMABLE_LINK_LIFETIME_MILLIS, - headers = uploadForm.headers - ) - } - } - /** * Uploads an attachment using the v4 upload scheme. */ @@ -88,8 +76,8 @@ class AttachmentApi( val digestInfo = pushServiceSocket.uploadAttachment(attachmentData) AttachmentUploadResult( - remoteId = SignalServiceAttachmentRemoteId.V4(attachmentData.resumableUploadSpec.cdnKey), - cdnNumber = attachmentData.resumableUploadSpec.cdnNumber, + remoteId = SignalServiceAttachmentRemoteId.V4(resumableUploadSpec.cdnKey), + cdnNumber = resumableUploadSpec.cdnNumber, key = resumableUploadSpec.attachmentKey, digest = digestInfo.digest, incrementalDigest = digestInfo.incrementalDigest, @@ -102,18 +90,94 @@ class AttachmentApi( } /** - * Uploads a raw file using the v4 upload scheme. No additional encryption is supplied! Always prefer [uploadAttachmentV4], unless you are using a separate - * encryption scheme (i.e. like backup files). + * Uploads an encrypted attachment, automatically choosing the best upload strategy based on CDN version. + * For CDN3, uses TUS "Creation With Upload" (single POST). For other CDNs, falls back to the legacy + * resumable upload flow (POST create + HEAD + PATCH). + * + * If [existingSpec] is provided, the upload resumes using the existing resumable upload URL (HEAD+PATCH) + * and [form] is not required. + * Otherwise, [form] is required, a new upload is initiated, and [onSpecCreated] is called with the + * [ResumableUploadSpec] before the upload begins, allowing callers to persist it for crash recovery. */ - fun uploadPreEncryptedFileToAttachmentV4(uploadForm: AttachmentUploadForm, resumableUploadUrl: String, inputStream: InputStream, inputStreamLength: Long): NetworkResult { + fun uploadAttachmentV4( + form: AttachmentUploadForm? = null, + key: ByteArray, + iv: ByteArray, + checksumSha256: String?, + attachmentStream: SignalServiceAttachmentStream, + existingSpec: ResumableUploadSpec? = null, + onSpecCreated: ((ResumableUploadSpec) -> Unit)? = null + ): NetworkResult { return NetworkResult.fromFetch { - pushServiceSocket.uploadBackupFile(uploadForm, resumableUploadUrl, inputStream, inputStreamLength) - } - } + require(existingSpec != null || form != null) { "Either existingSpec or form must be provided" } - fun getResumableUploadUrl(uploadForm: AttachmentUploadForm): NetworkResult { - return NetworkResult.fromFetch { - pushServiceSocket.getResumableUploadUrl(uploadForm) + val paddedLength = PaddingInputStream.getPaddedSize(attachmentStream.length) + val dataStream: InputStream = PaddingInputStream(attachmentStream.inputStream, attachmentStream.length) + val ciphertextLength = AttachmentCipherStreamUtil.getCiphertextLength(paddedLength) + + val effectiveKey = existingSpec?.attachmentKey ?: key + val effectiveIv = existingSpec?.attachmentIv ?: iv + + val attachmentData = PushAttachmentData( + contentType = attachmentStream.contentType, + data = dataStream, + dataSize = ciphertextLength, + incremental = attachmentStream.isFaststart, + outputStreamFactory = AttachmentCipherOutputStreamFactory(effectiveKey, effectiveIv), + listener = attachmentStream.listener, + cancelationSignal = attachmentStream.cancelationSignal, + resumableUploadSpec = existingSpec + ) + + val digestInfo = if (existingSpec != null) { + Log.i(TAG, "Resuming upload via HEAD+PATCH") + pushServiceSocket.uploadAttachment(attachmentData) + } else if (form!!.cdn == 3) { + Log.i(TAG, "Fresh upload via creation-with-upload (CDN3)") + + val spec = ResumableUploadSpec( + attachmentKey = key, + attachmentIv = iv, + cdnKey = form.key, + cdnNumber = form.cdn, + resumeLocation = form.signedUploadLocation + "/" + form.key, + expirationTimestamp = System.currentTimeMillis() + PushServiceSocket.CDN2_RESUMABLE_LINK_LIFETIME_MILLIS, + headers = form.headers + ) + onSpecCreated?.invoke(spec) + + pushServiceSocket.createAndUploadToCdn3(form, checksumSha256, attachmentData) + } else { + Log.i(TAG, "Fresh upload via legacy flow (CDN${form.cdn})") + val resumeUrl = pushServiceSocket.getResumableUploadUrl(form, checksumSha256) + val spec = ResumableUploadSpec( + attachmentKey = key, + attachmentIv = iv, + cdnKey = form.key, + cdnNumber = form.cdn, + resumeLocation = resumeUrl, + expirationTimestamp = System.currentTimeMillis() + PushServiceSocket.CDN2_RESUMABLE_LINK_LIFETIME_MILLIS, + headers = form.headers + ) + onSpecCreated?.invoke(spec) + + pushServiceSocket.uploadAttachment(attachmentData.copy(resumableUploadSpec = spec)) + } + + val cdnKey = existingSpec?.cdnKey ?: form!!.key + val cdnNumber = existingSpec?.cdnNumber ?: form!!.cdn + + AttachmentUploadResult( + remoteId = SignalServiceAttachmentRemoteId.V4(cdnKey), + cdnNumber = cdnNumber, + key = key, + digest = digestInfo.digest, + incrementalDigest = digestInfo.incrementalDigest, + incrementalDigestChunkSize = digestInfo.incrementalMacChunkSize, + uploadTimestamp = attachmentStream.uploadTimestamp, + dataSize = attachmentStream.length, + blurHash = attachmentStream.blurHash.getOrNull() + ) } } } diff --git a/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/crypto/AttachmentCipherStreamUtil.kt b/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/crypto/AttachmentCipherStreamUtil.kt index 5dde4752a3..7050454e13 100644 --- a/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/crypto/AttachmentCipherStreamUtil.kt +++ b/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/crypto/AttachmentCipherStreamUtil.kt @@ -5,6 +5,9 @@ package org.whispersystems.signalservice.api.crypto +import org.signal.core.util.stream.NullOutputStream +import java.io.InputStream + object AttachmentCipherStreamUtil { /** @@ -23,4 +26,21 @@ object AttachmentCipherStreamUtil { fun getPlaintextLength(ciphertextLength: Long): Long { return ((ciphertextLength - 16 - 32) / 16 - 1) * 16 } + + /** + * Computes the SHA-256 digest of the ciphertext that would result from encrypting [plaintextStream] with the given [key] and [iv]. + * This includes the IV prefix and HMAC suffix that are part of the encrypted attachment format. + * The stream is encrypted to /dev/null -- only the digest is retained. + */ + @JvmStatic + fun computeCiphertextSha256(key: ByteArray, iv: ByteArray, plaintextStream: InputStream): ByteArray { + val cipherOutputStream = AttachmentCipherOutputStream(key, iv, NullOutputStream) + val buffer = ByteArray(16 * 1024) + var read: Int + while (plaintextStream.read(buffer).also { read = it } != -1) { + cipherOutputStream.write(buffer, 0, read) + } + cipherOutputStream.close() + return cipherOutputStream.transmittedDigest + } } diff --git a/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/push/PushAttachmentData.kt b/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/push/PushAttachmentData.kt index f914f31e37..964b58c95d 100644 --- a/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/push/PushAttachmentData.kt +++ b/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/push/PushAttachmentData.kt @@ -22,5 +22,5 @@ data class PushAttachmentData( val outputStreamFactory: OutputStreamFactory, val listener: SignalServiceAttachment.ProgressListener?, val cancelationSignal: CancelationSignal?, - val resumableUploadSpec: ResumableUploadSpec + val resumableUploadSpec: ResumableUploadSpec? = null ) diff --git a/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/push/PushServiceSocket.java b/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/push/PushServiceSocket.java index ae321fe1ef..171730b509 100644 --- a/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/push/PushServiceSocket.java +++ b/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/push/PushServiceSocket.java @@ -930,6 +930,10 @@ public class PushServiceSocket { } public String getResumableUploadUrl(AttachmentUploadForm uploadForm) throws IOException { + return getResumableUploadUrl(uploadForm, null); + } + + public String getResumableUploadUrl(AttachmentUploadForm uploadForm, @Nullable String checksumSha256) throws IOException { ConnectionHolder connectionHolder = getRandom(cdnClientsMap.get(uploadForm.cdn), random); OkHttpClient okHttpClient = connectionHolder.getClient() .newBuilder() @@ -957,6 +961,9 @@ public class PushServiceSocket { } else if (uploadForm.cdn == 3) { request.addHeader("Upload-Defer-Length", "1") .addHeader("Tus-Resumable", "1.0.0"); + if (checksumSha256 != null) { + request.addHeader("x-signal-checksum-sha256", checksumSha256); + } } else { throw new AssertionError("Unknown CDN version: " + uploadForm.cdn); } @@ -984,6 +991,75 @@ public class PushServiceSocket { } } + public AttachmentDigest createAndUploadToCdn3(AttachmentUploadForm uploadForm, + @Nullable String checksumSha256, + PushAttachmentData attachmentData) + throws IOException + { + ConnectionHolder connectionHolder = getRandom(cdnClientsMap.get(3), random); + OkHttpClient okHttpClient = connectionHolder.getClient() + .newBuilder() + .connectTimeout(soTimeoutMillis, TimeUnit.MILLISECONDS) + .readTimeout(soTimeoutMillis, TimeUnit.MILLISECONDS) + .build(); + DigestingRequestBody file = new DigestingRequestBody(attachmentData.getData(), attachmentData.getOutputStreamFactory(), "application/offset+octet-stream", attachmentData.getDataSize(), attachmentData.getIncremental(), attachmentData.getListener(), attachmentData.getCancelationSignal(), 0); + + Request.Builder request = new Request.Builder().url(buildConfiguredUrl(connectionHolder, uploadForm.signedUploadLocation)) + .post(file) + .addHeader("Upload-Length", String.valueOf(attachmentData.getDataSize())) + .addHeader("Tus-Resumable", "1.0.0"); + + for (Map.Entry header : uploadForm.headers.entrySet()) { + if (!header.getKey().equalsIgnoreCase("host")) { + request.header(header.getKey(), header.getValue()); + } + } + + if (checksumSha256 != null) { + request.addHeader("x-signal-checksum-sha256", checksumSha256); + } + + if (connectionHolder.getHostHeader().isPresent()) { + request.header("host", connectionHolder.getHostHeader().get()); + } + + Call call = okHttpClient.newCall(request.build()); + + synchronized (connections) { + connections.add(call); + } + + try (Response response = call.execute()) { + if (response.isSuccessful()) { + return file.getAttachmentDigest(); + } else { + throw new NonSuccessfulResponseCodeException(response.code(), "Response: " + response, response.body().string()); + } + } catch (PushNetworkException | NonSuccessfulResponseCodeException e) { + throw e; + } catch (IOException e) { + if (e instanceof StreamResetException) { + throw e; + } + throw new PushNetworkException(e); + } finally { + synchronized (connections) { + connections.remove(call); + } + } + } + + public void uploadBackupFile(AttachmentUploadForm uploadForm, + @Nullable String checksumSha256, + InputStream data, + long length, + ProgressListener progressListener, + CancelationSignal cancelationSignal) + throws IOException + { + createAndUploadToCdn3(uploadForm, checksumSha256, new PushAttachmentData(null, data, length, false, new NoCipherOutputStreamFactory(), progressListener, cancelationSignal, null)); + } + private AttachmentDigest uploadToCdn2(String resumableUrl, InputStream data, String contentType, long length, boolean incremental, OutputStreamFactory outputStreamFactory, ProgressListener progressListener, CancelationSignal cancelationSignal) throws IOException { ConnectionHolder connectionHolder = getRandom(cdnClientsMap.get(2), random); OkHttpClient okHttpClient = connectionHolder.getClient() @@ -1042,7 +1118,7 @@ public class PushServiceSocket { if (uploadForm.cdn == 2) { uploadToCdn2(resumableUploadUrl, data, "application/octet-stream", dataLength, false, new NoCipherOutputStreamFactory(), progressListener, null); } else { - uploadToCdn3(resumableUploadUrl, data, "application/octet-stream", dataLength, false, new NoCipherOutputStreamFactory(), progressListener, null, uploadForm.headers); + uploadToCdn3(resumableUploadUrl, data, "application/offset+octet-stream", dataLength, false, new NoCipherOutputStreamFactory(), progressListener, null, uploadForm.headers); } } @@ -1211,6 +1287,9 @@ public class PushServiceSocket { } catch (PushNetworkException | NonSuccessfulResponseCodeException e) { throw e; } catch (IOException e) { + if (e instanceof StreamResetException || e instanceof ResumeLocationInvalidException) { + throw e; + } throw new PushNetworkException(e); } finally { synchronized (connections) {