Add a job to backfill attachment uploads to the archive service.

This commit is contained in:
Greyson Parrelli
2024-04-18 11:23:58 -04:00
parent 1e4d96b7c4
commit a82b9ee25f
17 changed files with 567 additions and 113 deletions

View File

@@ -0,0 +1,105 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.attachments
import android.content.Context
import android.graphics.Bitmap
import android.os.Build
import org.signal.core.util.logging.Log
import org.signal.protos.resumableuploads.ResumableUpload
import org.thoughtcrime.securesms.blurhash.BlurHashEncoder
import org.thoughtcrime.securesms.mms.PartAuthority
import org.thoughtcrime.securesms.util.MediaUtil
import org.whispersystems.signalservice.api.messages.SignalServiceAttachment
import org.whispersystems.signalservice.api.messages.SignalServiceAttachment.ProgressListener
import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentStream
import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec
import java.io.IOException
import java.util.Objects
/**
* A place collect common attachment upload operations to allow for code reuse.
*/
object AttachmentUploadUtil {
private val TAG = Log.tag(AttachmentUploadUtil::class.java)
/**
* Builds a [SignalServiceAttachmentStream] from the provided data, which can then be provided to various upload methods.
*/
@Throws(IOException::class)
fun buildSignalServiceAttachmentStream(
context: Context,
attachment: Attachment,
uploadSpec: ResumableUpload,
cancellationSignal: (() -> Boolean)? = null,
progressListener: ProgressListener? = null
): SignalServiceAttachmentStream {
val inputStream = PartAuthority.getAttachmentStream(context, attachment.uri!!)
val builder = SignalServiceAttachment.newStreamBuilder()
.withStream(inputStream)
.withContentType(attachment.contentType)
.withLength(attachment.size)
.withFileName(attachment.fileName)
.withVoiceNote(attachment.voiceNote)
.withBorderless(attachment.borderless)
.withGif(attachment.videoGif)
.withFaststart(attachment.transformProperties?.mp4FastStart ?: false)
.withWidth(attachment.width)
.withHeight(attachment.height)
.withUploadTimestamp(System.currentTimeMillis())
.withCaption(attachment.caption)
.withResumableUploadSpec(ResumableUploadSpec.from(uploadSpec))
.withCancelationSignal(cancellationSignal)
.withListener(progressListener)
if (MediaUtil.isImageType(attachment.contentType)) {
builder.withBlurHash(getImageBlurHash(context, attachment))
} else if (MediaUtil.isVideoType(attachment.contentType)) {
builder.withBlurHash(getVideoBlurHash(context, attachment))
}
return builder.build()
}
@Throws(IOException::class)
private fun getImageBlurHash(context: Context, attachment: Attachment): String? {
if (attachment.blurHash != null) {
return attachment.blurHash!!.hash
}
if (attachment.uri == null) {
return null
}
return PartAuthority.getAttachmentStream(context, attachment.uri!!).use { inputStream ->
BlurHashEncoder.encode(inputStream)
}
}
@Throws(IOException::class)
private fun getVideoBlurHash(context: Context, attachment: Attachment): String? {
if (attachment.blurHash != null) {
return attachment.blurHash.hash
}
if (Build.VERSION.SDK_INT < 23) {
Log.w(TAG, "Video thumbnails not supported...")
return null
}
return MediaUtil.getVideoThumbnail(context, Objects.requireNonNull(attachment.uri), 1000)?.let { bitmap ->
val thumb = Bitmap.createScaledBitmap(bitmap, 100, 100, false)
bitmap.recycle()
Log.i(TAG, "Generated video thumbnail...")
val hash = BlurHashEncoder.encode(thumb)
thumb.recycle()
hash
}
}
}

View File

@@ -346,6 +346,10 @@ object BackupRepository {
return api return api
.triggerBackupIdReservation(backupKey) .triggerBackupIdReservation(backupKey)
.then { getAuthCredential() } .then { getAuthCredential() }
.then { credential ->
api.setPublicKey(backupKey, credential)
.map { credential }
}
.then { credential -> .then { credential ->
val mediaName = attachment.getMediaName() val mediaName = attachment.getMediaName()
val request = attachment.toArchiveMediaRequest(mediaName, backupKey) val request = attachment.toArchiveMediaRequest(mediaName, backupKey)

View File

@@ -98,7 +98,6 @@ import java.security.NoSuchAlgorithmException
import java.util.LinkedList import java.util.LinkedList
import java.util.Optional import java.util.Optional
import java.util.UUID import java.util.UUID
import kotlin.time.Duration.Companion.days
class AttachmentTable( class AttachmentTable(
context: Context, context: Context,
@@ -147,6 +146,7 @@ class AttachmentTable(
const val ARCHIVE_MEDIA_NAME = "archive_media_name" const val ARCHIVE_MEDIA_NAME = "archive_media_name"
const val ARCHIVE_MEDIA_ID = "archive_media_id" const val ARCHIVE_MEDIA_ID = "archive_media_id"
const val ARCHIVE_TRANSFER_FILE = "archive_transfer_file" const val ARCHIVE_TRANSFER_FILE = "archive_transfer_file"
const val ARCHIVE_TRANSFER_STATE = "archive_transfer_state"
const val ATTACHMENT_JSON_ALIAS = "attachment_json" const val ATTACHMENT_JSON_ALIAS = "attachment_json"
@@ -201,7 +201,8 @@ class AttachmentTable(
ARCHIVE_TRANSFER_FILE ARCHIVE_TRANSFER_FILE
) )
const val CREATE_TABLE = """ @JvmField
val CREATE_TABLE = """
CREATE TABLE $TABLE_NAME ( CREATE TABLE $TABLE_NAME (
$ID INTEGER PRIMARY KEY AUTOINCREMENT, $ID INTEGER PRIMARY KEY AUTOINCREMENT,
$MESSAGE_ID INTEGER, $MESSAGE_ID INTEGER,
@@ -239,7 +240,8 @@ class AttachmentTable(
$ARCHIVE_CDN INTEGER DEFAULT 0, $ARCHIVE_CDN INTEGER DEFAULT 0,
$ARCHIVE_MEDIA_NAME TEXT DEFAULT NULL, $ARCHIVE_MEDIA_NAME TEXT DEFAULT NULL,
$ARCHIVE_MEDIA_ID TEXT DEFAULT NULL, $ARCHIVE_MEDIA_ID TEXT DEFAULT NULL,
$ARCHIVE_TRANSFER_FILE TEXT DEFAULT NULL $ARCHIVE_TRANSFER_FILE TEXT DEFAULT NULL,
$ARCHIVE_TRANSFER_STATE INTEGER DEFAULT ${ArchiveTransferState.NONE.value}
) )
""" """
@@ -254,8 +256,6 @@ class AttachmentTable(
"CREATE INDEX IF NOT EXISTS attachment_archive_media_id_index ON $TABLE_NAME ($ARCHIVE_MEDIA_ID);" "CREATE INDEX IF NOT EXISTS attachment_archive_media_id_index ON $TABLE_NAME ($ARCHIVE_MEDIA_ID);"
) )
val ATTACHMENT_POINTER_REUSE_THRESHOLD = 7.days.inWholeMilliseconds
@JvmStatic @JvmStatic
@Throws(IOException::class) @Throws(IOException::class)
fun newDataFile(context: Context): File { fun newDataFile(context: Context): File {
@@ -426,6 +426,78 @@ class AttachmentTable(
}.flatten() }.flatten()
} }
/**
* Finds the next eligible attachment that needs to be uploaded to the archive service.
* If it exists, it'll also atomically be marked as [ArchiveTransferState.BACKFILL_UPLOAD_IN_PROGRESS].
*/
fun getNextAttachmentToArchiveAndMarkUploadInProgress(): DatabaseAttachment? {
return writableDatabase.withinTransaction {
val record: DatabaseAttachment? = readableDatabase
.select(*PROJECTION)
.from(TABLE_NAME)
.where("$ARCHIVE_TRANSFER_STATE = ? AND $DATA_FILE NOT NULL AND $TRANSFER_STATE = $TRANSFER_PROGRESS_DONE", ArchiveTransferState.NONE.value)
.orderBy("$ID DESC")
.limit(1)
.run()
.readToSingleObject { it.readAttachment() }
if (record != null) {
writableDatabase
.update(TABLE_NAME)
.values(ARCHIVE_TRANSFER_STATE to ArchiveTransferState.BACKFILL_UPLOAD_IN_PROGRESS.value)
.where("$ID = ?", record.attachmentId)
.run()
}
record
}
}
/**
* Returns the current archive transfer state, if the attachment can be found.
*/
fun getArchiveTransferState(id: AttachmentId): ArchiveTransferState? {
return readableDatabase
.select(ARCHIVE_TRANSFER_STATE)
.from(TABLE_NAME)
.where("$ID = ?", id.id)
.run()
.readToSingleObject { ArchiveTransferState.deserialize(it.requireInt(ARCHIVE_TRANSFER_STATE)) }
}
/**
* Sets the archive transfer state for the given attachment and all other attachments that share the same data file.
*/
fun setArchiveTransferState(id: AttachmentId, state: ArchiveTransferState) {
writableDatabase.withinTransaction {
val dataFile: String = readableDatabase
.select(DATA_FILE)
.from(TABLE_NAME)
.where("$ID = ?", id.id)
.run()
.readToSingleObject { it.requireString(DATA_FILE) } ?: return@withinTransaction
writableDatabase
.update(TABLE_NAME)
.values(ARCHIVE_TRANSFER_STATE to state.value)
.where("$DATA_FILE = ?", dataFile)
.run()
}
}
/**
* Resets any in-progress archive backfill states to [ArchiveTransferState.NONE], returning the number that had to be reset.
* This should only be called if you believe the backfill process has finished. In this case, if this returns a value > 0,
* it indicates that state was mis-tracked and you should try uploading again.
*/
fun resetPendingArchiveBackfills(): Int {
return writableDatabase
.update(TABLE_NAME)
.values(ARCHIVE_TRANSFER_STATE to ArchiveTransferState.NONE.value)
.where("$ARCHIVE_TRANSFER_STATE == ${ArchiveTransferState.BACKFILL_UPLOAD_IN_PROGRESS.value} || $ARCHIVE_TRANSFER_STATE == ${ArchiveTransferState.BACKFILL_UPLOADED.value}")
.run()
}
fun deleteAttachmentsForMessage(mmsId: Long): Boolean { fun deleteAttachmentsForMessage(mmsId: Long): Boolean {
Log.d(TAG, "[deleteAttachmentsForMessage] mmsId: $mmsId") Log.d(TAG, "[deleteAttachmentsForMessage] mmsId: $mmsId")
@@ -1992,4 +2064,44 @@ class AttachmentTable(
} }
} }
} }
/**
* This maintains two different state paths for uploading attachments to the archive.
*
* The first is the backfill process, which will happen after newly-enabling backups. That process will go:
* 1. [NONE]
* 2. [BACKFILL_UPLOAD_IN_PROGRESS]
* 3. [BACKFILL_UPLOADED]
* 4. [FINISHED] or [PERMANENT_FAILURE]
*
* The second is when newly sending/receiving an attachment after enabling backups. That process will go:
* 1. [NONE]
* 2. [ATTACHMENT_TRANSFER_PENDING]
* 3. [FINISHED] or [PERMANENT_FAILURE]
*/
enum class ArchiveTransferState(val value: Int) {
/** Not backed up at all. */
NONE(0),
/** The upload to the attachment service is in progress. */
BACKFILL_UPLOAD_IN_PROGRESS(1),
/** Successfully uploaded to the attachment service during the backfill process. Still need to tell the service to move the file over to the archive service. */
BACKFILL_UPLOADED(2),
/** Completely finished backing up the attachment. */
FINISHED(3),
/** It is impossible to upload this attachment. */
PERMANENT_FAILURE(4),
/** We sent/received this attachment after enabling backups, but still need to transfer the file to the archive service. */
ATTACHMENT_TRANSFER_PENDING(5);
companion object {
fun deserialize(value: Int): ArchiveTransferState {
return values().firstOrNull { it.value == value } ?: NONE
}
}
}
} }

View File

@@ -84,6 +84,7 @@ import org.thoughtcrime.securesms.database.helpers.migration.V223_AddNicknameAnd
import org.thoughtcrime.securesms.database.helpers.migration.V224_AddAttachmentArchiveColumns import org.thoughtcrime.securesms.database.helpers.migration.V224_AddAttachmentArchiveColumns
import org.thoughtcrime.securesms.database.helpers.migration.V225_AddLocalUserJoinedStateAndGroupCallActiveState import org.thoughtcrime.securesms.database.helpers.migration.V225_AddLocalUserJoinedStateAndGroupCallActiveState
import org.thoughtcrime.securesms.database.helpers.migration.V226_AddAttachmentMediaIdIndex import org.thoughtcrime.securesms.database.helpers.migration.V226_AddAttachmentMediaIdIndex
import org.thoughtcrime.securesms.database.helpers.migration.V227_AddAttachmentArchiveTransferState
/** /**
* Contains all of the database migrations for [SignalDatabase]. Broken into a separate file for cleanliness. * Contains all of the database migrations for [SignalDatabase]. Broken into a separate file for cleanliness.
@@ -170,10 +171,11 @@ object SignalDatabaseMigrations {
223 to V223_AddNicknameAndNoteFieldsToRecipientTable, 223 to V223_AddNicknameAndNoteFieldsToRecipientTable,
224 to V224_AddAttachmentArchiveColumns, 224 to V224_AddAttachmentArchiveColumns,
225 to V225_AddLocalUserJoinedStateAndGroupCallActiveState, 225 to V225_AddLocalUserJoinedStateAndGroupCallActiveState,
226 to V226_AddAttachmentMediaIdIndex 226 to V226_AddAttachmentMediaIdIndex,
227 to V227_AddAttachmentArchiveTransferState
) )
const val DATABASE_VERSION = 226 const val DATABASE_VERSION = 227
@JvmStatic @JvmStatic
fun migrate(context: Application, db: SQLiteDatabase, oldVersion: Int, newVersion: Int) { fun migrate(context: Application, db: SQLiteDatabase, oldVersion: Int, newVersion: Int) {

View File

@@ -0,0 +1,19 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.database.helpers.migration
import android.app.Application
import net.zetetic.database.sqlcipher.SQLiteDatabase
/**
* Adds a new column to track the status of transferring attachments to the archive service.
*/
object V227_AddAttachmentArchiveTransferState : SignalDatabaseMigration {
override fun migrate(context: Application, db: SQLiteDatabase, oldVersion: Int, newVersion: Int) {
db.execSQL("ALTER TABLE attachment ADD COLUMN archive_transfer_state INTEGER DEFAULT 0")
}
}

View File

@@ -1,14 +0,0 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.jobmanager
import org.thoughtcrime.securesms.jobmanager.impl.BackoffUtil
import org.thoughtcrime.securesms.util.FeatureFlags
/**
* Helper to calculate the default backoff interval for a [Job] given it's run attempt count.
*/
fun Job.defaultBackoffInterval(): Long = BackoffUtil.exponentialBackoff(runAttempt + 1, FeatureFlags.getDefaultMaxBackoff())

View File

@@ -0,0 +1,226 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.jobs
import org.signal.core.util.logging.Log
import org.signal.protos.resumableuploads.ResumableUpload
import org.thoughtcrime.securesms.attachments.Attachment
import org.thoughtcrime.securesms.attachments.AttachmentId
import org.thoughtcrime.securesms.attachments.AttachmentUploadUtil
import org.thoughtcrime.securesms.attachments.DatabaseAttachment
import org.thoughtcrime.securesms.attachments.PointerAttachment
import org.thoughtcrime.securesms.backup.v2.BackupRepository
import org.thoughtcrime.securesms.database.AttachmentTable
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint
import org.thoughtcrime.securesms.jobs.protos.ArchiveAttachmentBackfillJobData
import org.whispersystems.signalservice.api.NetworkResult
import org.whispersystems.signalservice.api.archive.ArchiveMediaResponse
import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentPointer
import java.io.IOException
import java.util.Optional
import kotlin.time.Duration.Companion.days
/**
* When run, this will find the next attachment that needs to be uploaded to the archive service and upload it.
* It will enqueue a copy of itself if it thinks there is more work to be done, and that copy will continue the upload process.
*/
class ArchiveAttachmentBackfillJob private constructor(
parameters: Parameters,
private var attachmentId: AttachmentId?,
private var uploadSpec: ResumableUpload?
) : Job(parameters) {
companion object {
private val TAG = Log.tag(ArchiveAttachmentBackfillJob::class.java)
const val KEY = "ArchiveAttachmentBackfillJob"
}
constructor() : this(
parameters = Parameters.Builder()
.setQueue("ArchiveAttachmentBackfillJob")
.setMaxInstancesForQueue(2)
.setLifespan(30.days.inWholeMilliseconds)
.setMaxAttempts(Parameters.UNLIMITED)
.addConstraint(NetworkConstraint.KEY)
.build(),
attachmentId = null,
uploadSpec = null
)
override fun serialize(): ByteArray {
return ArchiveAttachmentBackfillJobData(
attachmentId = attachmentId?.id,
uploadSpec = uploadSpec
).encode()
}
override fun getFactoryKey(): String = KEY
override fun run(): Result {
var attachmentRecord: DatabaseAttachment? = if (attachmentId != null) {
Log.i(TAG, "Retrying $attachmentId")
SignalDatabase.attachments.getAttachment(attachmentId!!)
} else {
SignalDatabase.attachments.getNextAttachmentToArchiveAndMarkUploadInProgress()
}
if (attachmentRecord == null && attachmentId != null) {
Log.w(TAG, "Attachment $attachmentId was not found! Was likely deleted during the process of archiving. Re-enqueuing job with no ID.")
ApplicationDependencies.getJobManager().add(ArchiveAttachmentBackfillJob())
return Result.success()
}
// TODO [backup] If we ever wanted to allow multiple instances of this job to run in parallel, this would have to be done somewhere else
if (attachmentRecord == null) {
Log.i(TAG, "No more attachments to backfill! Ensuring there's no dangling state.")
val resetCount = SignalDatabase.attachments.resetPendingArchiveBackfills()
if (resetCount > 0) {
Log.w(TAG, "We thought we were done, but $resetCount items were still in progress! Need to run again to retry.")
ApplicationDependencies.getJobManager().add(ArchiveAttachmentBackfillJob())
} else {
Log.i(TAG, "All good! Should be done.")
}
return Result.success()
}
attachmentId = attachmentRecord.attachmentId
val transferState: AttachmentTable.ArchiveTransferState? = SignalDatabase.attachments.getArchiveTransferState(attachmentRecord.attachmentId)
if (transferState == null) {
Log.w(TAG, "Attachment $attachmentId was not found when looking for the transfer state! Was likely just deleted. Re-enqueuing job with no ID.")
ApplicationDependencies.getJobManager().add(ArchiveAttachmentBackfillJob())
return Result.success()
}
Log.i(TAG, "Current state: $transferState")
if (transferState == AttachmentTable.ArchiveTransferState.FINISHED) {
Log.i(TAG, "Attachment $attachmentId is already finished. Skipping.")
ApplicationDependencies.getJobManager().add(ArchiveAttachmentBackfillJob())
return Result.success()
}
if (transferState == AttachmentTable.ArchiveTransferState.PERMANENT_FAILURE) {
Log.i(TAG, "Attachment $attachmentId is already marked as a permanent failure. Skipping.")
ApplicationDependencies.getJobManager().add(ArchiveAttachmentBackfillJob())
return Result.success()
}
if (transferState == AttachmentTable.ArchiveTransferState.ATTACHMENT_TRANSFER_PENDING) {
Log.i(TAG, "Attachment $attachmentId is already marked as pending transfer, meaning it's a send attachment that will be uploaded on it's own. Skipping.")
ApplicationDependencies.getJobManager().add(ArchiveAttachmentBackfillJob())
return Result.success()
}
if (transferState == AttachmentTable.ArchiveTransferState.BACKFILL_UPLOAD_IN_PROGRESS) {
if (uploadSpec == null || System.currentTimeMillis() > uploadSpec!!.timeout) {
Log.d(TAG, "Need an upload spec. Fetching...")
uploadSpec = ApplicationDependencies.getSignalServiceMessageSender().getResumableUploadSpec().toProto()
} else {
Log.d(TAG, "Already have an upload spec. Continuing...")
}
val attachmentStream = try {
AttachmentUploadUtil.buildSignalServiceAttachmentStream(
context = context,
attachment = attachmentRecord,
uploadSpec = uploadSpec!!,
cancellationSignal = { this.isCanceled }
)
} catch (e: IOException) {
Log.e(TAG, "Failed to get attachment stream for $attachmentId", e)
return Result.retry(defaultBackoff())
}
Log.d(TAG, "Beginning upload...")
val remoteAttachment: SignalServiceAttachmentPointer = try {
ApplicationDependencies.getSignalServiceMessageSender().uploadAttachment(attachmentStream)
} catch (e: IOException) {
Log.w(TAG, "Failed to upload $attachmentId", e)
return Result.retry(defaultBackoff())
}
Log.d(TAG, "Upload complete!")
val pointerAttachment: Attachment = PointerAttachment.forPointer(Optional.of(remoteAttachment), null, attachmentRecord.fastPreflightId).get()
SignalDatabase.attachments.finalizeAttachmentAfterUpload(attachmentRecord.attachmentId, pointerAttachment, remoteAttachment.uploadTimestamp)
SignalDatabase.attachments.setArchiveTransferState(attachmentRecord.attachmentId, AttachmentTable.ArchiveTransferState.BACKFILL_UPLOADED)
attachmentRecord = SignalDatabase.attachments.getAttachment(attachmentRecord.attachmentId)
}
if (attachmentRecord == null) {
Log.w(TAG, "$attachmentId was not found after uploading! Possibly deleted in a narrow race condition. Re-enqueuing job with no ID.")
ApplicationDependencies.getJobManager().add(ArchiveAttachmentBackfillJob())
return Result.success()
}
Log.d(TAG, "Moving attachment to archive...")
return when (val result = BackupRepository.archiveMedia(attachmentRecord)) {
is NetworkResult.Success -> {
Log.d(TAG, "Move complete!")
SignalDatabase.attachments.setArchiveTransferState(attachmentRecord.attachmentId, AttachmentTable.ArchiveTransferState.FINISHED)
ApplicationDependencies.getJobManager().add(ArchiveAttachmentBackfillJob())
Result.success()
}
is NetworkResult.ApplicationError -> {
Log.w(TAG, "Failed to archive ${attachmentRecord.attachmentId} due to an application error. Retrying.", result.throwable)
Result.retry(defaultBackoff())
}
is NetworkResult.NetworkError -> {
Log.w(TAG, "Encountered a transient network error. Retrying.")
Result.retry(defaultBackoff())
}
is NetworkResult.StatusCodeError -> {
Log.w(TAG, "Failed request with status code ${result.code} for ${attachmentRecord.attachmentId}")
when (ArchiveMediaResponse.StatusCodes.from(result.code)) {
ArchiveMediaResponse.StatusCodes.BadArguments,
ArchiveMediaResponse.StatusCodes.InvalidPresentationOrSignature,
ArchiveMediaResponse.StatusCodes.InsufficientPermissions,
ArchiveMediaResponse.StatusCodes.RateLimited -> {
Result.retry(defaultBackoff())
}
ArchiveMediaResponse.StatusCodes.NoMediaSpaceRemaining -> {
// TODO [backup] This will end the process right away. We need to integrate this with client-driven retry UX.
Result.failure()
}
ArchiveMediaResponse.StatusCodes.Unknown -> {
Result.retry(defaultBackoff())
}
}
}
}
}
override fun onFailure() {
attachmentId?.let { id ->
Log.w(TAG, "Failed to archive $id!")
}
}
class Factory : Job.Factory<ArchiveAttachmentBackfillJob> {
override fun create(parameters: Parameters, serializedData: ByteArray?): ArchiveAttachmentBackfillJob {
val data = serializedData?.let { ArchiveAttachmentBackfillJobData.ADAPTER.decode(it) }
return ArchiveAttachmentBackfillJob(
parameters = parameters,
attachmentId = data?.attachmentId?.let { AttachmentId(it) },
uploadSpec = data?.uploadSpec
)
}
}
}

View File

@@ -12,7 +12,6 @@ import org.thoughtcrime.securesms.attachments.AttachmentId
import org.thoughtcrime.securesms.database.SignalDatabase import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies import org.thoughtcrime.securesms.dependencies.ApplicationDependencies
import org.thoughtcrime.securesms.jobmanager.Job import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobmanager.defaultBackoffInterval
import java.io.File import java.io.File
import java.io.FileNotFoundException import java.io.FileNotFoundException
import java.io.IOException import java.io.IOException
@@ -84,7 +83,7 @@ class AttachmentHashBackfillJob private constructor(parameters: Parameters) : Jo
Log.w(TAG, "Underlying cause was a FileNotFoundException. Clearing all usages.", true) Log.w(TAG, "Underlying cause was a FileNotFoundException. Clearing all usages.", true)
SignalDatabase.attachments.clearUsagesOfDataFile(file) SignalDatabase.attachments.clearUsagesOfDataFile(file)
} else { } else {
return Result.retry(defaultBackoffInterval()) return Result.retry(defaultBackoff())
} }
} }

View File

@@ -4,8 +4,6 @@
*/ */
package org.thoughtcrime.securesms.jobs package org.thoughtcrime.securesms.jobs
import android.graphics.Bitmap
import android.os.Build
import android.text.TextUtils import android.text.TextUtils
import org.greenrobot.eventbus.EventBus import org.greenrobot.eventbus.EventBus
import org.signal.core.util.inRoundedDays import org.signal.core.util.inRoundedDays
@@ -15,8 +13,8 @@ import org.signal.protos.resumableuploads.ResumableUpload
import org.thoughtcrime.securesms.R import org.thoughtcrime.securesms.R
import org.thoughtcrime.securesms.attachments.Attachment import org.thoughtcrime.securesms.attachments.Attachment
import org.thoughtcrime.securesms.attachments.AttachmentId import org.thoughtcrime.securesms.attachments.AttachmentId
import org.thoughtcrime.securesms.attachments.AttachmentUploadUtil
import org.thoughtcrime.securesms.attachments.PointerAttachment import org.thoughtcrime.securesms.attachments.PointerAttachment
import org.thoughtcrime.securesms.blurhash.BlurHashEncoder
import org.thoughtcrime.securesms.database.AttachmentTable import org.thoughtcrime.securesms.database.AttachmentTable
import org.thoughtcrime.securesms.database.SignalDatabase import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies import org.thoughtcrime.securesms.dependencies.ApplicationDependencies
@@ -26,20 +24,16 @@ import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint
import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec
import org.thoughtcrime.securesms.jobs.protos.AttachmentUploadJobData import org.thoughtcrime.securesms.jobs.protos.AttachmentUploadJobData
import org.thoughtcrime.securesms.mms.MmsException import org.thoughtcrime.securesms.mms.MmsException
import org.thoughtcrime.securesms.mms.PartAuthority
import org.thoughtcrime.securesms.net.NotPushRegisteredException import org.thoughtcrime.securesms.net.NotPushRegisteredException
import org.thoughtcrime.securesms.recipients.Recipient import org.thoughtcrime.securesms.recipients.Recipient
import org.thoughtcrime.securesms.service.AttachmentProgressService import org.thoughtcrime.securesms.service.AttachmentProgressService
import org.thoughtcrime.securesms.util.FeatureFlags import org.thoughtcrime.securesms.util.FeatureFlags
import org.thoughtcrime.securesms.util.MediaUtil
import org.whispersystems.signalservice.api.crypto.AttachmentCipherStreamUtil import org.whispersystems.signalservice.api.crypto.AttachmentCipherStreamUtil
import org.whispersystems.signalservice.api.messages.SignalServiceAttachment import org.whispersystems.signalservice.api.messages.SignalServiceAttachment
import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentStream import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentStream
import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulResumableUploadResponseCodeException import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulResumableUploadResponseCodeException
import org.whispersystems.signalservice.internal.crypto.PaddingInputStream import org.whispersystems.signalservice.internal.crypto.PaddingInputStream
import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec
import java.io.IOException import java.io.IOException
import java.util.Objects
import java.util.Optional import java.util.Optional
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import kotlin.time.Duration.Companion.days import kotlin.time.Duration.Companion.days
@@ -210,23 +204,12 @@ class AttachmentUploadJob private constructor(
} }
return try { return try {
val inputStream = PartAuthority.getAttachmentStream(context, attachment.uri!!) AttachmentUploadUtil.buildSignalServiceAttachmentStream(
val builder = SignalServiceAttachment.newStreamBuilder() context = context,
.withStream(inputStream) attachment = attachment,
.withContentType(attachment.contentType) uploadSpec = resumableUploadSpec,
.withLength(attachment.size) cancellationSignal = { isCanceled },
.withFileName(attachment.fileName) progressListener = object : SignalServiceAttachment.ProgressListener {
.withVoiceNote(attachment.voiceNote)
.withBorderless(attachment.borderless)
.withGif(attachment.videoGif)
.withFaststart(attachment.transformProperties?.mp4FastStart ?: false)
.withWidth(attachment.width)
.withHeight(attachment.height)
.withUploadTimestamp(System.currentTimeMillis())
.withCaption(attachment.caption)
.withResumableUploadSpec(ResumableUploadSpec.from(resumableUploadSpec))
.withCancelationSignal { this.isCanceled }
.withListener(object : SignalServiceAttachment.ProgressListener {
override fun onAttachmentProgress(total: Long, progress: Long) { override fun onAttachmentProgress(total: Long, progress: Long) {
EventBus.getDefault().postSticky(PartProgressEvent(attachment, PartProgressEvent.Type.NETWORK, total, progress)) EventBus.getDefault().postSticky(PartProgressEvent(attachment, PartProgressEvent.Type.NETWORK, total, progress))
notification?.progress = (progress.toFloat() / total) notification?.progress = (progress.toFloat() / total)
@@ -235,58 +218,13 @@ class AttachmentUploadJob private constructor(
override fun shouldCancel(): Boolean { override fun shouldCancel(): Boolean {
return isCanceled return isCanceled
} }
})
if (MediaUtil.isImageType(attachment.contentType)) {
builder.withBlurHash(getImageBlurHash(attachment)).build()
} else if (MediaUtil.isVideoType(attachment.contentType)) {
builder.withBlurHash(getVideoBlurHash(attachment)).build()
} else {
builder.build()
} }
)
} catch (e: IOException) { } catch (e: IOException) {
throw InvalidAttachmentException(e) throw InvalidAttachmentException(e)
} }
} }
@Throws(IOException::class)
private fun getImageBlurHash(attachment: Attachment): String? {
if (attachment.blurHash != null) {
return attachment.blurHash!!.hash
}
if (attachment.uri == null) {
return null
}
return PartAuthority.getAttachmentStream(context, attachment.uri!!).use { inputStream ->
BlurHashEncoder.encode(inputStream)
}
}
@Throws(IOException::class)
private fun getVideoBlurHash(attachment: Attachment): String? {
if (attachment.blurHash != null) {
return attachment.blurHash!!.hash
}
if (Build.VERSION.SDK_INT < 23) {
Log.w(TAG, "Video thumbnails not supported...")
return null
}
return MediaUtil.getVideoThumbnail(context, Objects.requireNonNull(attachment.uri), 1000)?.let { bitmap ->
val thumb = Bitmap.createScaledBitmap(bitmap, 100, 100, false)
bitmap.recycle()
Log.i(TAG, "Generated video thumbnail...")
val hash = BlurHashEncoder.encode(thumb)
thumb.recycle()
hash
}
}
private inner class InvalidAttachmentException : Exception { private inner class InvalidAttachmentException : Exception {
constructor(message: String?) : super(message) constructor(message: String?) : super(message)
constructor(e: Exception?) : super(e) constructor(e: Exception?) : super(e)

View File

@@ -101,6 +101,7 @@ public final class JobManagerFactories {
put(AccountConsistencyWorkerJob.KEY, new AccountConsistencyWorkerJob.Factory()); put(AccountConsistencyWorkerJob.KEY, new AccountConsistencyWorkerJob.Factory());
put(AnalyzeDatabaseJob.KEY, new AnalyzeDatabaseJob.Factory()); put(AnalyzeDatabaseJob.KEY, new AnalyzeDatabaseJob.Factory());
put(ArchiveAttachmentJob.KEY, new ArchiveAttachmentJob.Factory()); put(ArchiveAttachmentJob.KEY, new ArchiveAttachmentJob.Factory());
put(ArchiveAttachmentBackfillJob.KEY, new ArchiveAttachmentBackfillJob.Factory());
put(AttachmentCompressionJob.KEY, new AttachmentCompressionJob.Factory()); put(AttachmentCompressionJob.KEY, new AttachmentCompressionJob.Factory());
put(AttachmentCopyJob.KEY, new AttachmentCopyJob.Factory()); put(AttachmentCopyJob.KEY, new AttachmentCopyJob.Factory());
put(AttachmentDownloadJob.KEY, new AttachmentDownloadJob.Factory()); put(AttachmentDownloadJob.KEY, new AttachmentDownloadJob.Factory());

View File

@@ -269,7 +269,7 @@ class PreKeysSyncJob private constructor(
return when (result) { return when (result) {
is NetworkResult.Success -> true is NetworkResult.Success -> true
is NetworkResult.NetworkError -> throw result.throwable ?: PushNetworkException("Network error") is NetworkResult.NetworkError -> throw result.exception ?: PushNetworkException("Network error")
is NetworkResult.ApplicationError -> throw result.throwable is NetworkResult.ApplicationError -> throw result.throwable
is NetworkResult.StatusCodeError -> if (result.code == 409) { is NetworkResult.StatusCodeError -> if (result.code == 409) {
false false

View File

@@ -51,3 +51,8 @@ message PreKeysSyncJobData {
message ArchiveAttachmentJobData { message ArchiveAttachmentJobData {
uint64 attachmentId = 1; uint64 attachmentId = 1;
} }
message ArchiveAttachmentBackfillJobData {
optional uint64 attachmentId = 1;
ResumableUpload uploadSpec = 2;
}

View File

@@ -6,7 +6,6 @@
package org.whispersystems.signalservice.api package org.whispersystems.signalservice.api
import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulResponseCodeException import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulResponseCodeException
import org.whispersystems.signalservice.api.push.exceptions.PushNetworkException
import java.io.IOException import java.io.IOException
/** /**
@@ -33,7 +32,7 @@ sealed class NetworkResult<T> {
fun <T> fromFetch(fetch: () -> T): NetworkResult<T> = try { fun <T> fromFetch(fetch: () -> T): NetworkResult<T> = try {
Success(fetch()) Success(fetch())
} catch (e: NonSuccessfulResponseCodeException) { } catch (e: NonSuccessfulResponseCodeException) {
StatusCodeError(e.code, e) StatusCodeError(e.code, e.body, e)
} catch (e: IOException) { } catch (e: IOException) {
NetworkError(e) NetworkError(e)
} catch (e: Throwable) { } catch (e: Throwable) {
@@ -45,10 +44,10 @@ sealed class NetworkResult<T> {
data class Success<T>(val result: T) : NetworkResult<T>() data class Success<T>(val result: T) : NetworkResult<T>()
/** Indicates a generic network error occurred before we were able to process a response. */ /** Indicates a generic network error occurred before we were able to process a response. */
data class NetworkError<T>(val throwable: Throwable? = null) : NetworkResult<T>() data class NetworkError<T>(val exception: IOException) : NetworkResult<T>()
/** Indicates we got a response, but it was a non-2xx response. */ /** Indicates we got a response, but it was a non-2xx response. */
data class StatusCodeError<T>(val code: Int, val throwable: Throwable? = null) : NetworkResult<T>() data class StatusCodeError<T>(val code: Int, val body: String?, val exception: IOException) : NetworkResult<T>()
/** Indicates that the application somehow failed in a way unrelated to network activity. Usually a runtime crash. */ /** Indicates that the application somehow failed in a way unrelated to network activity. Usually a runtime crash. */
data class ApplicationError<T>(val throwable: Throwable) : NetworkResult<T>() data class ApplicationError<T>(val throwable: Throwable) : NetworkResult<T>()
@@ -59,8 +58,8 @@ sealed class NetworkResult<T> {
fun successOrThrow(): T { fun successOrThrow(): T {
when (this) { when (this) {
is Success -> return result is Success -> return result
is NetworkError -> throw throwable ?: PushNetworkException("Network error") is NetworkError -> throw exception
is StatusCodeError -> throw throwable ?: NonSuccessfulResponseCodeException(this.code) is StatusCodeError -> throw exception
is ApplicationError -> throw throwable is ApplicationError -> throw throwable
} }
} }
@@ -72,8 +71,8 @@ sealed class NetworkResult<T> {
fun <R> map(transform: (T) -> R): NetworkResult<R> { fun <R> map(transform: (T) -> R): NetworkResult<R> {
return when (this) { return when (this) {
is Success -> Success(transform(this.result)) is Success -> Success(transform(this.result))
is NetworkError -> NetworkError(throwable) is NetworkError -> NetworkError(exception)
is StatusCodeError -> StatusCodeError(code, throwable) is StatusCodeError -> StatusCodeError(code, body, exception)
is ApplicationError -> ApplicationError(throwable) is ApplicationError -> ApplicationError(throwable)
} }
} }
@@ -85,8 +84,8 @@ sealed class NetworkResult<T> {
fun <R> then(result: (T) -> NetworkResult<R>): NetworkResult<R> { fun <R> then(result: (T) -> NetworkResult<R>): NetworkResult<R> {
return when (this) { return when (this) {
is Success -> result(this.result) is Success -> result(this.result)
is NetworkError -> NetworkError(throwable) is NetworkError -> NetworkError(exception)
is StatusCodeError -> StatusCodeError(code, throwable) is StatusCodeError -> StatusCodeError(code, body, exception)
is ApplicationError -> ApplicationError(throwable) is ApplicationError -> ApplicationError(throwable)
} }
} }

View File

@@ -176,6 +176,13 @@ class ArchiveApi(
/** /**
* Copy and re-encrypt media from the attachments cdn into the backup cdn. * Copy and re-encrypt media from the attachments cdn into the backup cdn.
*
* Possible errors:
* 400: Bad arguments, or made on an authenticated channel
* 401: Invalid presentation or signature
* 403: Insufficient permissions
* 413: No media space remaining
* 429: Rate-limited
*/ */
fun archiveAttachmentMedia( fun archiveAttachmentMedia(
backupKey: BackupKey, backupKey: BackupKey,

View File

@@ -12,4 +12,19 @@ import com.fasterxml.jackson.annotation.JsonProperty
*/ */
class ArchiveMediaResponse( class ArchiveMediaResponse(
@JsonProperty val cdn: Int @JsonProperty val cdn: Int
) ) {
enum class StatusCodes(val code: Int) {
BadArguments(400),
InvalidPresentationOrSignature(401),
InsufficientPermissions(403),
NoMediaSpaceRemaining(413),
RateLimited(429),
Unknown(-1);
companion object {
fun from(code: Int): StatusCodes {
return values().firstOrNull { it.code == code } ?: Unknown
}
}
}
}

View File

@@ -14,15 +14,24 @@ import java.io.IOException;
public class NonSuccessfulResponseCodeException extends IOException { public class NonSuccessfulResponseCodeException extends IOException {
private final int code; private final int code;
private final String body;
public NonSuccessfulResponseCodeException(int code) { public NonSuccessfulResponseCodeException(int code) {
super("StatusCode: " + code); super("StatusCode: " + code);
this.code = code; this.code = code;
this.body = null;
} }
public NonSuccessfulResponseCodeException(int code, String s) { public NonSuccessfulResponseCodeException(int code, String s) {
super("[" + code + "] " + s); super("[" + code + "] " + s);
this.code = code; this.code = code;
this.body = null;
}
public NonSuccessfulResponseCodeException(int code, String s, String body) {
super("[" + code + "] " + s);
this.code = code;
this.body = body;
} }
public int getCode() { public int getCode() {
@@ -36,4 +45,8 @@ public class NonSuccessfulResponseCodeException extends IOException {
public boolean is5xx() { public boolean is5xx() {
return code >= 500 && code < 600; return code >= 500 && code < 600;
} }
public String getBody() {
return body;
}
} }

View File

@@ -327,6 +327,7 @@ public class PushServiceSocket {
private static final Map<String, String> NO_HEADERS = Collections.emptyMap(); private static final Map<String, String> NO_HEADERS = Collections.emptyMap();
private static final ResponseCodeHandler NO_HANDLER = new EmptyResponseCodeHandler(); private static final ResponseCodeHandler NO_HANDLER = new EmptyResponseCodeHandler();
private static final ResponseCodeHandler UNOPINIONATED_HANDER = new UnopinionatedResponseCodeHandler();
private static final long CDN2_RESUMABLE_LINK_LIFETIME_MILLIS = TimeUnit.DAYS.toMillis(7); private static final long CDN2_RESUMABLE_LINK_LIFETIME_MILLIS = TimeUnit.DAYS.toMillis(7);
@@ -494,14 +495,14 @@ public class PushServiceSocket {
long secondsRoundedToNearestDay = TimeUnit.DAYS.toSeconds(TimeUnit.MILLISECONDS.toDays(currentTime)); long secondsRoundedToNearestDay = TimeUnit.DAYS.toSeconds(TimeUnit.MILLISECONDS.toDays(currentTime));
long endTimeInSeconds = secondsRoundedToNearestDay + TimeUnit.DAYS.toSeconds(7); long endTimeInSeconds = secondsRoundedToNearestDay + TimeUnit.DAYS.toSeconds(7);
String response = makeServiceRequest(String.format(Locale.US, ARCHIVE_CREDENTIALS, secondsRoundedToNearestDay, endTimeInSeconds), "GET", null); String response = makeServiceRequest(String.format(Locale.US, ARCHIVE_CREDENTIALS, secondsRoundedToNearestDay, endTimeInSeconds), "GET", null, NO_HEADERS, UNOPINIONATED_HANDER, Optional.empty());
return JsonUtil.fromJson(response, ArchiveServiceCredentialsResponse.class); return JsonUtil.fromJson(response, ArchiveServiceCredentialsResponse.class);
} }
public void setArchiveBackupId(BackupAuthCredentialRequest request) throws IOException { public void setArchiveBackupId(BackupAuthCredentialRequest request) throws IOException {
String body = JsonUtil.toJson(new ArchiveSetBackupIdRequest(request)); String body = JsonUtil.toJson(new ArchiveSetBackupIdRequest(request));
makeServiceRequest(ARCHIVE_BACKUP_ID, "PUT", body); makeServiceRequest(ARCHIVE_BACKUP_ID, "PUT", body, NO_HEADERS, UNOPINIONATED_HANDER, Optional.empty());
} }
public void setArchivePublicKey(ECPublicKey publicKey, ArchiveCredentialPresentation credentialPresentation) throws IOException { public void setArchivePublicKey(ECPublicKey publicKey, ArchiveCredentialPresentation credentialPresentation) throws IOException {
@@ -555,7 +556,7 @@ public class PushServiceSocket {
public ArchiveMediaResponse archiveAttachmentMedia(@Nonnull ArchiveCredentialPresentation credentialPresentation, @Nonnull ArchiveMediaRequest request) throws IOException { public ArchiveMediaResponse archiveAttachmentMedia(@Nonnull ArchiveCredentialPresentation credentialPresentation, @Nonnull ArchiveMediaRequest request) throws IOException {
Map<String, String> headers = credentialPresentation.toHeaders(); Map<String, String> headers = credentialPresentation.toHeaders();
String response = makeServiceRequestWithoutAuthentication(ARCHIVE_MEDIA, "PUT", JsonUtil.toJson(request), headers, NO_HANDLER); String response = makeServiceRequestWithoutAuthentication(ARCHIVE_MEDIA, "PUT", JsonUtil.toJson(request), headers, UNOPINIONATED_HANDER);
return JsonUtil.fromJson(response, ArchiveMediaResponse.class); return JsonUtil.fromJson(response, ArchiveMediaResponse.class);
} }
@@ -566,7 +567,7 @@ public class PushServiceSocket {
public BatchArchiveMediaResponse archiveAttachmentMedia(@Nonnull ArchiveCredentialPresentation credentialPresentation, @Nonnull BatchArchiveMediaRequest request) throws IOException { public BatchArchiveMediaResponse archiveAttachmentMedia(@Nonnull ArchiveCredentialPresentation credentialPresentation, @Nonnull BatchArchiveMediaRequest request) throws IOException {
Map<String, String> headers = credentialPresentation.toHeaders(); Map<String, String> headers = credentialPresentation.toHeaders();
String response = makeServiceRequestWithoutAuthentication(ARCHIVE_MEDIA_BATCH, "PUT", JsonUtil.toJson(request), headers, NO_HANDLER); String response = makeServiceRequestWithoutAuthentication(ARCHIVE_MEDIA_BATCH, "PUT", JsonUtil.toJson(request), headers, UNOPINIONATED_HANDER);
return JsonUtil.fromJson(response, BatchArchiveMediaResponse.class); return JsonUtil.fromJson(response, BatchArchiveMediaResponse.class);
} }
@@ -2660,6 +2661,28 @@ public class PushServiceSocket {
public void handle(int responseCode, ResponseBody body) { } public void handle(int responseCode, ResponseBody body) { }
} }
/**
* A {@link ResponseCodeHandler} that only throws {@link NonSuccessfulResponseCodeException} with the response body.
* Any further processing is left to the caller.
*/
private static class UnopinionatedResponseCodeHandler implements ResponseCodeHandler {
@Override
public void handle(int responseCode, ResponseBody body) throws NonSuccessfulResponseCodeException, PushNetworkException {
if (responseCode < 200 || responseCode > 299) {
String bodyString = null;
if (body != null) {
try {
bodyString = readBodyString(body);
} catch (MalformedResponseException e) {
Log.w(TAG, "Failed to read body string", e);
}
}
throw new NonSuccessfulResponseCodeException(responseCode, "Response: " + responseCode, bodyString);
}
}
}
public enum ClientSet { KeyBackup } public enum ClientSet { KeyBackup }
public CredentialResponse retrieveGroupsV2Credentials(long todaySeconds) public CredentialResponse retrieveGroupsV2Credentials(long todaySeconds)