Stop falling back to CDN0 for attachments.

This commit is contained in:
Greyson Parrelli
2023-10-27 12:02:35 -07:00
parent 7fdd7e89bd
commit c4f5110148
13 changed files with 364 additions and 90 deletions
@@ -0,0 +1,268 @@
/*
* Copyright 2023 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.jobs
import android.graphics.Bitmap
import android.os.Build
import android.text.TextUtils
import org.greenrobot.eventbus.EventBus
import org.signal.core.util.inRoundedDays
import org.signal.core.util.logging.Log
import org.signal.core.util.mebiBytes
import org.signal.protos.resumableuploads.ResumableUpload
import org.thoughtcrime.securesms.R
import org.thoughtcrime.securesms.attachments.Attachment
import org.thoughtcrime.securesms.attachments.AttachmentId
import org.thoughtcrime.securesms.attachments.PointerAttachment
import org.thoughtcrime.securesms.blurhash.BlurHashEncoder
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies
import org.thoughtcrime.securesms.events.PartProgressEvent
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint
import org.thoughtcrime.securesms.jobs.ForegroundServiceUtil.startGenericTaskWhenCapable
import org.thoughtcrime.securesms.jobs.protos.AttachmentUploadJobData
import org.thoughtcrime.securesms.mms.PartAuthority
import org.thoughtcrime.securesms.net.NotPushRegisteredException
import org.thoughtcrime.securesms.recipients.Recipient
import org.thoughtcrime.securesms.service.NotificationController
import org.thoughtcrime.securesms.util.FeatureFlags
import org.thoughtcrime.securesms.util.MediaUtil
import org.whispersystems.signalservice.api.crypto.AttachmentCipherStreamUtil
import org.whispersystems.signalservice.api.messages.SignalServiceAttachment
import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentStream
import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulResumableUploadResponseCodeException
import org.whispersystems.signalservice.internal.crypto.PaddingInputStream
import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec
import java.io.IOException
import java.util.Objects
import java.util.Optional
import java.util.concurrent.TimeUnit
import kotlin.time.Duration.Companion.milliseconds
/**
* Uploads an attachment without alteration.
*
* Queue [AttachmentCompressionJob] before to compress.
*/
class AttachmentUploadJob private constructor(
parameters: Parameters,
private val attachmentId: AttachmentId,
private var uploadSpec: ResumableUpload?
) : BaseJob(parameters) {
companion object {
const val KEY = "AttachmentUploadJobV3"
private val TAG = Log.tag(AttachmentUploadJob::class.java)
private val UPLOAD_REUSE_THRESHOLD = TimeUnit.DAYS.toMillis(3)
/**
* Foreground notification shows while uploading attachments above this.
*/
private val FOREGROUND_LIMIT = 10.mebiBytes.inWholeBytes
@JvmStatic
val maxPlaintextSize: Long
get() {
val maxCipherTextSize = FeatureFlags.maxAttachmentSizeBytes()
val maxPaddedSize = AttachmentCipherStreamUtil.getPlaintextLength(maxCipherTextSize)
return PaddingInputStream.getMaxUnpaddedSize(maxPaddedSize)
}
}
constructor(attachmentId: AttachmentId) : this(
Parameters.Builder()
.addConstraint(NetworkConstraint.KEY)
.setLifespan(TimeUnit.DAYS.toMillis(1))
.setMaxAttempts(Parameters.UNLIMITED)
.build(),
attachmentId,
null
)
override fun serialize(): ByteArray {
return AttachmentUploadJobData(
attachmentRowId = attachmentId.rowId,
attachmentUniqueId = attachmentId.uniqueId,
uploadSpec = uploadSpec
).encode()
}
override fun getFactoryKey(): String = KEY
override fun shouldTrace(): Boolean = true
@Throws(Exception::class)
public override fun onRun() {
if (!Recipient.self().isRegistered) {
throw NotPushRegisteredException()
}
val messageSender = ApplicationDependencies.getSignalServiceMessageSender()
val databaseAttachment = SignalDatabase.attachments.getAttachment(attachmentId) ?: throw InvalidAttachmentException("Cannot find the specified attachment.")
val timeSinceUpload = System.currentTimeMillis() - databaseAttachment.uploadTimestamp
if (timeSinceUpload < UPLOAD_REUSE_THRESHOLD && !TextUtils.isEmpty(databaseAttachment.location)) {
Log.i(TAG, "We can re-use an already-uploaded file. It was uploaded $timeSinceUpload ms (${timeSinceUpload.milliseconds.inRoundedDays()} days) ago. Skipping.")
return
} else if (databaseAttachment.uploadTimestamp > 0) {
Log.i(TAG, "This file was previously-uploaded, but too long ago to be re-used. Age: $timeSinceUpload ms (${timeSinceUpload.milliseconds.inRoundedDays()} days)")
}
if (uploadSpec != null && System.currentTimeMillis() > uploadSpec!!.timeout) {
Log.w(TAG, "Upload spec expired! Clearing.")
uploadSpec = null
}
if (uploadSpec == null) {
Log.d(TAG, "Need an upload spec. Fetching...")
uploadSpec = ApplicationDependencies.getSignalServiceMessageSender().getResumableUploadSpec().toProto()
} else {
Log.d(TAG, "Re-using existing upload spec.")
}
Log.i(TAG, "Uploading attachment for message " + databaseAttachment.mmsId + " with ID " + databaseAttachment.attachmentId)
try {
getAttachmentNotificationIfNeeded(databaseAttachment).use { notification ->
buildAttachmentStream(databaseAttachment, notification, uploadSpec!!).use { localAttachment ->
val remoteAttachment = messageSender.uploadAttachment(localAttachment)
val attachment = PointerAttachment.forPointer(Optional.of(remoteAttachment), null, databaseAttachment.fastPreflightId).get()
SignalDatabase.attachments.updateAttachmentAfterUpload(databaseAttachment.attachmentId, attachment, remoteAttachment.uploadTimestamp)
}
}
} catch (e: NonSuccessfulResumableUploadResponseCodeException) {
if (e.code == 400) {
Log.w(TAG, "Failed to upload due to a 400 when getting resumable upload information. Clearing upload spec.", e)
uploadSpec = null
}
throw e
}
}
private fun getAttachmentNotificationIfNeeded(attachment: Attachment): NotificationController? {
return if (attachment.size >= FOREGROUND_LIMIT) {
try {
startGenericTaskWhenCapable(context, context.getString(R.string.AttachmentUploadJob_uploading_media))
} catch (e: UnableToStartException) {
Log.w(TAG, "Unable to start foreground service", e)
null
}
} else {
null
}
}
override fun onFailure() {
if (isCanceled) {
SignalDatabase.attachments.deleteAttachment(attachmentId)
}
}
override fun onShouldRetry(exception: Exception): Boolean {
return exception is IOException && exception !is NotPushRegisteredException
}
@Throws(InvalidAttachmentException::class)
private fun buildAttachmentStream(attachment: Attachment, notification: NotificationController?, resumableUploadSpec: ResumableUpload): SignalServiceAttachmentStream {
if (attachment.uri == null || attachment.size == 0L) {
throw InvalidAttachmentException(IOException("Outgoing attachment has no data!"))
}
return try {
val inputStream = PartAuthority.getAttachmentStream(context, attachment.uri!!)
val builder = SignalServiceAttachment.newStreamBuilder()
.withStream(inputStream)
.withContentType(attachment.contentType)
.withLength(attachment.size)
.withFileName(attachment.fileName)
.withVoiceNote(attachment.isVoiceNote)
.withBorderless(attachment.isBorderless)
.withGif(attachment.isVideoGif)
.withWidth(attachment.width)
.withHeight(attachment.height)
.withUploadTimestamp(System.currentTimeMillis())
.withCaption(attachment.caption)
.withResumableUploadSpec(ResumableUploadSpec.from(resumableUploadSpec))
.withCancelationSignal { this.isCanceled }
.withListener(object : SignalServiceAttachment.ProgressListener {
override fun onAttachmentProgress(total: Long, progress: Long) {
EventBus.getDefault().postSticky(PartProgressEvent(attachment, PartProgressEvent.Type.NETWORK, total, progress))
notification?.setProgress(total, progress)
}
override fun shouldCancel(): Boolean {
return isCanceled
}
})
if (MediaUtil.isImageType(attachment.contentType)) {
builder.withBlurHash(getImageBlurHash(attachment)).build()
} else if (MediaUtil.isVideoType(attachment.contentType)) {
builder.withBlurHash(getVideoBlurHash(attachment)).build()
} else {
builder.build()
}
} catch (e: IOException) {
throw InvalidAttachmentException(e)
}
}
@Throws(IOException::class)
private fun getImageBlurHash(attachment: Attachment): String? {
if (attachment.blurHash != null) {
return attachment.blurHash!!.hash
}
if (attachment.uri == null) {
return null
}
return PartAuthority.getAttachmentStream(context, attachment.uri!!).use { inputStream ->
BlurHashEncoder.encode(inputStream)
}
}
@Throws(IOException::class)
private fun getVideoBlurHash(attachment: Attachment): String? {
if (attachment.blurHash != null) {
return attachment.blurHash!!.hash
}
if (Build.VERSION.SDK_INT < 23) {
Log.w(TAG, "Video thumbnails not supported...")
return null
}
return MediaUtil.getVideoThumbnail(context, Objects.requireNonNull(attachment.uri), 1000)?.let { bitmap ->
val thumb = Bitmap.createScaledBitmap(bitmap, 100, 100, false)
bitmap.recycle()
Log.i(TAG, "Generated video thumbnail...")
val hash = BlurHashEncoder.encode(thumb)
thumb.recycle()
hash
}
}
private inner class InvalidAttachmentException : Exception {
constructor(message: String?) : super(message)
constructor(e: Exception?) : super(e)
}
class Factory : Job.Factory<AttachmentUploadJob?> {
override fun create(parameters: Parameters, serializedData: ByteArray?): AttachmentUploadJob {
val data = AttachmentUploadJobData.ADAPTER.decode(serializedData!!)
return AttachmentUploadJob(
parameters = parameters,
attachmentId = AttachmentId(data.attachmentRowId, data.attachmentUniqueId),
data.uploadSpec
)
}
}
}
@@ -95,11 +95,11 @@ public final class JobManagerFactories {
public static Map<String, Job.Factory> getJobFactories(@NonNull Application application) { public static Map<String, Job.Factory> getJobFactories(@NonNull Application application) {
return new HashMap<String, Job.Factory>() {{ return new HashMap<String, Job.Factory>() {{
put(AccountConsistencyWorkerJob.KEY, new AccountConsistencyWorkerJob.Factory()); put(AccountConsistencyWorkerJob.KEY, new AccountConsistencyWorkerJob.Factory());
put(AttachmentCompressionJob.KEY, new AttachmentCompressionJob.Factory());
put(AttachmentCopyJob.KEY, new AttachmentCopyJob.Factory()); put(AttachmentCopyJob.KEY, new AttachmentCopyJob.Factory());
put(AttachmentDownloadJob.KEY, new AttachmentDownloadJob.Factory()); put(AttachmentDownloadJob.KEY, new AttachmentDownloadJob.Factory());
put(AttachmentUploadJob.KEY, new AttachmentUploadJob.Factory());
put(AttachmentMarkUploadedJob.KEY, new AttachmentMarkUploadedJob.Factory()); put(AttachmentMarkUploadedJob.KEY, new AttachmentMarkUploadedJob.Factory());
put(AttachmentCompressionJob.KEY, new AttachmentCompressionJob.Factory()); put(AttachmentUploadJob.KEY, new AttachmentUploadJob.Factory());
put(AutomaticSessionResetJob.KEY, new AutomaticSessionResetJob.Factory()); put(AutomaticSessionResetJob.KEY, new AutomaticSessionResetJob.Factory());
put(AvatarGroupsV1DownloadJob.KEY, new AvatarGroupsV1DownloadJob.Factory()); put(AvatarGroupsV1DownloadJob.KEY, new AvatarGroupsV1DownloadJob.Factory());
put(AvatarGroupsV2DownloadJob.KEY, new AvatarGroupsV2DownloadJob.Factory()); put(AvatarGroupsV2DownloadJob.KEY, new AvatarGroupsV2DownloadJob.Factory());
@@ -132,6 +132,7 @@ public final class JobManagerFactories {
put(IndividualSendJob.KEY, new IndividualSendJob.Factory()); put(IndividualSendJob.KEY, new IndividualSendJob.Factory());
put(LeaveGroupV2Job.KEY, new LeaveGroupV2Job.Factory()); put(LeaveGroupV2Job.KEY, new LeaveGroupV2Job.Factory());
put(LeaveGroupV2WorkerJob.KEY, new LeaveGroupV2WorkerJob.Factory()); put(LeaveGroupV2WorkerJob.KEY, new LeaveGroupV2WorkerJob.Factory());
put(LegacyAttachmentUploadJob.KEY, new LegacyAttachmentUploadJob.Factory());
put(LocalBackupJob.KEY, new LocalBackupJob.Factory()); put(LocalBackupJob.KEY, new LocalBackupJob.Factory());
put(LocalBackupJobApi29.KEY, new LocalBackupJobApi29.Factory()); put(LocalBackupJobApi29.KEY, new LocalBackupJobApi29.Factory());
put(MarkerJob.KEY, new MarkerJob.Factory()); put(MarkerJob.KEY, new MarkerJob.Factory());
@@ -26,21 +26,17 @@ import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.events.PartProgressEvent; import org.thoughtcrime.securesms.events.PartProgressEvent;
import org.thoughtcrime.securesms.jobmanager.Job; import org.thoughtcrime.securesms.jobmanager.Job;
import org.thoughtcrime.securesms.jobmanager.JsonJobData; import org.thoughtcrime.securesms.jobmanager.JsonJobData;
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint;
import org.thoughtcrime.securesms.mms.PartAuthority; import org.thoughtcrime.securesms.mms.PartAuthority;
import org.thoughtcrime.securesms.net.NotPushRegisteredException; import org.thoughtcrime.securesms.net.NotPushRegisteredException;
import org.thoughtcrime.securesms.recipients.Recipient; import org.thoughtcrime.securesms.recipients.Recipient;
import org.thoughtcrime.securesms.service.NotificationController; import org.thoughtcrime.securesms.service.NotificationController;
import org.thoughtcrime.securesms.util.FeatureFlags;
import org.thoughtcrime.securesms.util.MediaUtil; import org.thoughtcrime.securesms.util.MediaUtil;
import org.whispersystems.signalservice.api.SignalServiceMessageSender; import org.whispersystems.signalservice.api.SignalServiceMessageSender;
import org.whispersystems.signalservice.api.crypto.AttachmentCipherStreamUtil;
import org.whispersystems.signalservice.api.messages.SignalServiceAttachment; import org.whispersystems.signalservice.api.messages.SignalServiceAttachment;
import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentPointer; import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentPointer;
import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentStream; import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentStream;
import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulResumableUploadResponseCodeException; import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulResumableUploadResponseCodeException;
import org.whispersystems.signalservice.api.push.exceptions.ResumeLocationInvalidException; import org.whispersystems.signalservice.api.push.exceptions.ResumeLocationInvalidException;
import org.whispersystems.signalservice.internal.crypto.PaddingInputStream;
import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec; import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec;
import java.io.IOException; import java.io.IOException;
@@ -54,12 +50,13 @@ import java.util.concurrent.TimeUnit;
* <p> * <p>
* Queue {@link AttachmentCompressionJob} before to compress. * Queue {@link AttachmentCompressionJob} before to compress.
*/ */
public final class AttachmentUploadJob extends BaseJob { @Deprecated
public final class LegacyAttachmentUploadJob extends BaseJob {
public static final String KEY = "AttachmentUploadJobV2"; public static final String KEY = "AttachmentUploadJobV2";
@SuppressWarnings("unused") @SuppressWarnings("unused")
private static final String TAG = Log.tag(AttachmentUploadJob.class); private static final String TAG = Log.tag(LegacyAttachmentUploadJob.class);
private static final long UPLOAD_REUSE_THRESHOLD = TimeUnit.DAYS.toMillis(3); private static final long UPLOAD_REUSE_THRESHOLD = TimeUnit.DAYS.toMillis(3);
@@ -72,27 +69,11 @@ public final class AttachmentUploadJob extends BaseJob {
*/ */
private static final int FOREGROUND_LIMIT = 10 * 1024 * 1024; private static final int FOREGROUND_LIMIT = 10 * 1024 * 1024;
public static long getMaxPlaintextSize() {
long maxCipherTextSize = FeatureFlags.maxAttachmentSizeBytes();
long maxPaddedSize = AttachmentCipherStreamUtil.getPlaintextLength(maxCipherTextSize);
return PaddingInputStream.getMaxUnpaddedSize(maxPaddedSize);
}
private final AttachmentId attachmentId; private final AttachmentId attachmentId;
private boolean forceV2; private boolean forceV2;
public AttachmentUploadJob(AttachmentId attachmentId) { private LegacyAttachmentUploadJob(@NonNull Job.Parameters parameters, @NonNull AttachmentId attachmentId, boolean forceV2) {
this(new Job.Parameters.Builder()
.addConstraint(NetworkConstraint.KEY)
.setLifespan(TimeUnit.DAYS.toMillis(1))
.setMaxAttempts(Parameters.UNLIMITED)
.build(),
attachmentId,
false);
}
private AttachmentUploadJob(@NonNull Job.Parameters parameters, @NonNull AttachmentId attachmentId, boolean forceV2) {
super(parameters); super(parameters);
this.attachmentId = attachmentId; this.attachmentId = attachmentId;
this.forceV2 = forceV2; this.forceV2 = forceV2;
@@ -289,12 +270,12 @@ public final class AttachmentUploadJob extends BaseJob {
} }
} }
public static final class Factory implements Job.Factory<AttachmentUploadJob> { public static final class Factory implements Job.Factory<LegacyAttachmentUploadJob> {
@Override @Override
public @NonNull AttachmentUploadJob create(@NonNull Parameters parameters, @Nullable byte[] serializedData) { public @NonNull LegacyAttachmentUploadJob create(@NonNull Parameters parameters, @Nullable byte[] serializedData) {
JsonJobData data = JsonJobData.deserialize(serializedData); JsonJobData data = JsonJobData.deserialize(serializedData);
return new AttachmentUploadJob(parameters, new AttachmentId(data.getLong(KEY_ROW_ID), data.getLong(KEY_UNIQUE_ID)), data.getBooleanOrDefault(KEY_FORCE_V2, false)); return new LegacyAttachmentUploadJob(parameters, new AttachmentId(data.getLong(KEY_ROW_ID), data.getLong(KEY_UNIQUE_ID)), data.getBooleanOrDefault(KEY_FORCE_V2, false));
} }
} }
} }
@@ -242,16 +242,9 @@ public abstract class PushSendJob extends SendJob {
return new HashSet<>(Stream.of(attachments).map(a -> { return new HashSet<>(Stream.of(attachments).map(a -> {
AttachmentUploadJob attachmentUploadJob = new AttachmentUploadJob(((DatabaseAttachment) a).getAttachmentId()); AttachmentUploadJob attachmentUploadJob = new AttachmentUploadJob(((DatabaseAttachment) a).getAttachmentId());
if (message.isGroup()) { jobManager.startChain(AttachmentCompressionJob.fromAttachment((DatabaseAttachment) a, false, -1))
jobManager.startChain(AttachmentCompressionJob.fromAttachment((DatabaseAttachment) a, false, -1)) .then(attachmentUploadJob)
.then(attachmentUploadJob) .enqueue();
.enqueue();
} else {
jobManager.startChain(AttachmentCompressionJob.fromAttachment((DatabaseAttachment) a, false, -1))
.then(new ResumableUploadSpecJob())
.then(attachmentUploadJob)
.enqueue();
}
return attachmentUploadJob.getId(); return attachmentUploadJob.getId();
}) })
@@ -13,6 +13,10 @@ import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/**
* No longer used. Functionality has been merged into {@link AttachmentUploadJob}.
*/
@Deprecated
public class ResumableUploadSpecJob extends BaseJob { public class ResumableUploadSpecJob extends BaseJob {
private static final String TAG = Log.tag(ResumableUploadSpecJob.class); private static final String TAG = Log.tag(ResumableUploadSpecJob.class);
@@ -21,14 +25,6 @@ public class ResumableUploadSpecJob extends BaseJob {
public static final String KEY = "ResumableUploadSpecJob"; public static final String KEY = "ResumableUploadSpecJob";
public ResumableUploadSpecJob() {
this(new Job.Parameters.Builder()
.addConstraint(NetworkConstraint.KEY)
.setLifespan(TimeUnit.DAYS.toMillis(1))
.setMaxAttempts(Parameters.UNLIMITED)
.build());
}
private ResumableUploadSpecJob(@NonNull Parameters parameters) { private ResumableUploadSpecJob(@NonNull Parameters parameters) {
super(parameters); super(parameters);
} }
@@ -59,7 +59,6 @@ import org.thoughtcrime.securesms.jobs.PushGroupSendJob;
import org.thoughtcrime.securesms.jobs.IndividualSendJob; import org.thoughtcrime.securesms.jobs.IndividualSendJob;
import org.thoughtcrime.securesms.jobs.ReactionSendJob; import org.thoughtcrime.securesms.jobs.ReactionSendJob;
import org.thoughtcrime.securesms.jobs.RemoteDeleteSendJob; import org.thoughtcrime.securesms.jobs.RemoteDeleteSendJob;
import org.thoughtcrime.securesms.jobs.ResumableUploadSpecJob;
import org.thoughtcrime.securesms.jobs.SmsSendJob; import org.thoughtcrime.securesms.jobs.SmsSendJob;
import org.thoughtcrime.securesms.keyvalue.SignalStore; import org.thoughtcrime.securesms.keyvalue.SignalStore;
import org.thoughtcrime.securesms.linkpreview.LinkPreview; import org.thoughtcrime.securesms.linkpreview.LinkPreview;
@@ -418,17 +417,15 @@ public class MessageSender {
AttachmentTable attachmentDatabase = SignalDatabase.attachments(); AttachmentTable attachmentDatabase = SignalDatabase.attachments();
DatabaseAttachment databaseAttachment = attachmentDatabase.insertAttachmentForPreUpload(attachment); DatabaseAttachment databaseAttachment = attachmentDatabase.insertAttachmentForPreUpload(attachment);
Job compressionJob = AttachmentCompressionJob.fromAttachment(databaseAttachment, false, -1); Job compressionJob = AttachmentCompressionJob.fromAttachment(databaseAttachment, false, -1);
Job resumableUploadSpecJob = new ResumableUploadSpecJob(); Job uploadJob = new AttachmentUploadJob(databaseAttachment.getAttachmentId());
Job uploadJob = new AttachmentUploadJob(databaseAttachment.getAttachmentId());
ApplicationDependencies.getJobManager() ApplicationDependencies.getJobManager()
.startChain(compressionJob) .startChain(compressionJob)
.then(resumableUploadSpecJob)
.then(uploadJob) .then(uploadJob)
.enqueue(); .enqueue();
return new PreUploadResult(media, databaseAttachment.getAttachmentId(), Arrays.asList(compressionJob.getId(), resumableUploadSpecJob.getId(), uploadJob.getId())); return new PreUploadResult(media, databaseAttachment.getAttachmentId(), Arrays.asList(compressionJob.getId(), uploadJob.getId()));
} catch (MmsException e) { } catch (MmsException e) {
Log.w(TAG, "preUploadPushAttachment() - Failed to upload!", e); Log.w(TAG, "preUploadPushAttachment() - Failed to upload!", e);
return null; return null;
@@ -11,7 +11,6 @@ import org.thoughtcrime.securesms.jobmanager.JobManager
import org.thoughtcrime.securesms.jobs.AttachmentCompressionJob import org.thoughtcrime.securesms.jobs.AttachmentCompressionJob
import org.thoughtcrime.securesms.jobs.AttachmentCopyJob import org.thoughtcrime.securesms.jobs.AttachmentCopyJob
import org.thoughtcrime.securesms.jobs.AttachmentUploadJob import org.thoughtcrime.securesms.jobs.AttachmentUploadJob
import org.thoughtcrime.securesms.jobs.ResumableUploadSpecJob
import org.thoughtcrime.securesms.mms.OutgoingMessage import org.thoughtcrime.securesms.mms.OutgoingMessage
/** /**
@@ -196,12 +195,10 @@ class UploadDependencyGraph private constructor(
*/ */
private fun createAttachmentUploadChain(jobManager: JobManager, databaseAttachment: DatabaseAttachment): Pair<JobId, JobManager.Chain> { private fun createAttachmentUploadChain(jobManager: JobManager, databaseAttachment: DatabaseAttachment): Pair<JobId, JobManager.Chain> {
val compressionJob: Job = AttachmentCompressionJob.fromAttachment(databaseAttachment, false, -1) val compressionJob: Job = AttachmentCompressionJob.fromAttachment(databaseAttachment, false, -1)
val resumableUploadSpecJob: Job = ResumableUploadSpecJob()
val uploadJob: Job = AttachmentUploadJob(databaseAttachment.attachmentId) val uploadJob: Job = AttachmentUploadJob(databaseAttachment.attachmentId)
return uploadJob.id to jobManager return uploadJob.id to jobManager
.startChain(compressionJob) .startChain(compressionJob)
.then(resumableUploadSpecJob)
.then(uploadJob) .then(uploadJob)
} }
} }
+9
View File
@@ -2,9 +2,12 @@ syntax = "proto3";
package signal; package signal;
import "ResumableUploads.proto";
option java_package = "org.thoughtcrime.securesms.jobs.protos"; option java_package = "org.thoughtcrime.securesms.jobs.protos";
option java_multiple_files = true; option java_multiple_files = true;
message CallSyncEventJobRecord { message CallSyncEventJobRecord {
uint64 recipientId = 1; uint64 recipientId = 1;
reserved 2; reserved 2;
@@ -28,3 +31,9 @@ message CallLogEventSendJobData {
message CallLinkUpdateSendJobData { message CallLinkUpdateSendJobData {
string callLinkRoomId = 1; string callLinkRoomId = 1;
} }
message AttachmentUploadJobData {
uint64 attachmentRowId = 1;
uint64 attachmentUniqueId = 2;
optional ResumableUpload uploadSpec = 3;
}
@@ -21,7 +21,7 @@ import org.thoughtcrime.securesms.jobmanager.JsonJobData
import org.thoughtcrime.securesms.jobs.AttachmentCompressionJob import org.thoughtcrime.securesms.jobs.AttachmentCompressionJob
import org.thoughtcrime.securesms.jobs.AttachmentCopyJob import org.thoughtcrime.securesms.jobs.AttachmentCopyJob
import org.thoughtcrime.securesms.jobs.AttachmentUploadJob import org.thoughtcrime.securesms.jobs.AttachmentUploadJob
import org.thoughtcrime.securesms.jobs.ResumableUploadSpecJob import org.thoughtcrime.securesms.jobs.protos.AttachmentUploadJobData
import org.thoughtcrime.securesms.mms.OutgoingMessage import org.thoughtcrime.securesms.mms.OutgoingMessage
import org.thoughtcrime.securesms.mms.SentMediaQuality import org.thoughtcrime.securesms.mms.SentMediaQuality
import org.thoughtcrime.securesms.recipients.Recipient import org.thoughtcrime.securesms.recipients.Recipient
@@ -206,16 +206,15 @@ class UploadDependencyGraphTest {
assertTrue(steps.all { it.size == 1 }) assertTrue(steps.all { it.size == 1 })
assertTrue(steps[0][0] is AttachmentCompressionJob) assertTrue(steps[0][0] is AttachmentCompressionJob)
assertTrue(steps[1][0] is ResumableUploadSpecJob) assertTrue(steps[1][0] is AttachmentUploadJob)
assertTrue(steps[2][0] is AttachmentUploadJob)
if (expectedCopyDestinationCount > 0) { if (expectedCopyDestinationCount > 0) {
assertTrue(steps[3][0] is AttachmentCopyJob) assertTrue(steps[2][0] is AttachmentCopyJob)
val uploadData = JsonJobData.deserialize(steps[2][0].serialize()) val uploadData = AttachmentUploadJobData.ADAPTER.decode(steps[1][0].serialize()!!)
val copyData = JsonJobData.deserialize(steps[3][0].serialize()) val copyData = JsonJobData.deserialize(steps[2][0].serialize())
val uploadAttachmentId = AttachmentId(uploadData.getLong("row_id"), uploadData.getLong("unique_id")) val uploadAttachmentId = AttachmentId(uploadData.attachmentRowId, uploadData.attachmentUniqueId)
val copySourceAttachmentId = JsonUtils.fromJson(copyData.getString("source_id"), AttachmentId::class.java) val copySourceAttachmentId = JsonUtils.fromJson(copyData.getString("source_id"), AttachmentId::class.java)
assertEquals(uploadAttachmentId, copySourceAttachmentId) assertEquals(uploadAttachmentId, copySourceAttachmentId)
@@ -223,7 +222,7 @@ class UploadDependencyGraphTest {
val copyDestinations = copyData.getStringArray("destination_ids") val copyDestinations = copyData.getStringArray("destination_ids")
assertEquals(expectedCopyDestinationCount, copyDestinations.size) assertEquals(expectedCopyDestinationCount, copyDestinations.size)
} else { } else {
assertEquals(3, steps.size) assertEquals(2, steps.size)
} }
} }
@@ -8,15 +8,27 @@ package org.signal.core.util
inline val Long.bytes: ByteSize inline val Long.bytes: ByteSize
get() = ByteSize(this) get() = ByteSize(this)
inline val Int.bytes: ByteSize
get() = ByteSize(this.toLong())
inline val Long.kibiBytes: ByteSize inline val Long.kibiBytes: ByteSize
get() = (this * 1024).bytes get() = (this * 1024).bytes
inline val Int.kibiBytes: ByteSize
get() = (this * 1024).bytes
inline val Long.mebiBytes: ByteSize inline val Long.mebiBytes: ByteSize
get() = (this * 1024).kibiBytes get() = (this * 1024).kibiBytes
inline val Int.mebiBytes: ByteSize
get() = (this * 1024).kibiBytes
inline val Long.gibiBytes: ByteSize inline val Long.gibiBytes: ByteSize
get() = (this * 1024).mebiBytes get() = (this * 1024).mebiBytes
inline val Int.gibiBytes: ByteSize
get() = (this * 1024).mebiBytes
class ByteSize(val bytes: Long) { class ByteSize(val bytes: Long) {
val inWholeBytes: Long val inWholeBytes: Long
get() = bytes get() = bytes
@@ -0,0 +1,14 @@
/*
* Copyright 2023 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.signal.core.util
import kotlin.time.Duration
import kotlin.time.DurationUnit
fun Duration.inRoundedMilliseconds(places: Int = 2) = this.toDouble(DurationUnit.MILLISECONDS).roundedString(places)
fun Duration.inRoundedMinutes(places: Int = 2) = this.toDouble(DurationUnit.MINUTES).roundedString(places)
fun Duration.inRoundedHours(places: Int = 2) = this.toDouble(DurationUnit.HOURS).roundedString(places)
fun Duration.inRoundedDays(places: Int = 2) = this.toDouble(DurationUnit.DAYS).roundedString(places)
@@ -798,6 +798,7 @@ public class SignalServiceMessageSender {
if (attachment.getResumableUploadSpec().isPresent()) { if (attachment.getResumableUploadSpec().isPresent()) {
return uploadAttachmentV4(attachment, attachmentKey, attachmentData); return uploadAttachmentV4(attachment, attachmentKey, attachmentData);
} else { } else {
Log.w(TAG, "Using legacy attachment upload endpoint.");
return uploadAttachmentV2(attachment, attachmentKey, attachmentData); return uploadAttachmentV2(attachment, attachmentKey, attachmentData);
} }
} }
@@ -6,7 +6,6 @@ import org.signal.core.util.Base64;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@@ -68,15 +67,15 @@ public final class ResumableUploadSpec {
return headers; return headers;
} }
public String serialize() { public ResumableUpload toProto() {
ResumableUpload.Builder builder = new ResumableUpload.Builder() ResumableUpload.Builder builder = new ResumableUpload.Builder()
.secretKey(ByteString.of(getSecretKey())) .secretKey(ByteString.of(getSecretKey()))
.iv(ByteString.of(getIV())) .iv(ByteString.of(getIV()))
.timeout(getExpirationTimestamp()) .timeout(getExpirationTimestamp())
.cdnNumber(getCdnNumber()) .cdnNumber(getCdnNumber())
.cdnKey(getCdnKey()) .cdnKey(getCdnKey())
.location(getResumeLocation()) .location(getResumeLocation())
.timeout(getExpirationTimestamp()); .timeout(getExpirationTimestamp());
builder.headers( builder.headers(
headers.entrySet() headers.entrySet()
@@ -85,31 +84,38 @@ public final class ResumableUploadSpec {
.collect(Collectors.toList()) .collect(Collectors.toList())
); );
return Base64.encodeWithPadding(builder.build().encode()); return builder.build();
}
public String serialize() {
return Base64.encodeWithPadding(toProto().encode());
} }
public static ResumableUploadSpec deserialize(String serializedSpec) throws ResumeLocationInvalidException { public static ResumableUploadSpec deserialize(String serializedSpec) throws ResumeLocationInvalidException {
if (serializedSpec == null) return null;
try { try {
ResumableUpload resumableUpload = ResumableUpload.ADAPTER.decode(Base64.decode(serializedSpec)); ResumableUpload resumableUpload = ResumableUpload.ADAPTER.decode(Base64.decode(serializedSpec));
return from(resumableUpload);
Map<String, String> headers = new HashMap<>();
for (ResumableUpload.Header header : resumableUpload.headers) {
headers.put(header.key, header.value_);
}
return new ResumableUploadSpec(
resumableUpload.secretKey.toByteArray(),
resumableUpload.iv.toByteArray(),
resumableUpload.cdnKey,
resumableUpload.cdnNumber,
resumableUpload.location,
resumableUpload.timeout,
headers
);
} catch (IOException e) { } catch (IOException e) {
throw new ResumeLocationInvalidException(); throw new ResumeLocationInvalidException();
} }
} }
public static ResumableUploadSpec from(ResumableUpload resumableUpload) throws ResumeLocationInvalidException {
if (resumableUpload == null) return null;
Map<String, String> headers = new HashMap<>();
for (ResumableUpload.Header header : resumableUpload.headers) {
headers.put(header.key, header.value_);
}
return new ResumableUploadSpec(
resumableUpload.secretKey.toByteArray(),
resumableUpload.iv.toByteArray(),
resumableUpload.cdnKey,
resumableUpload.cdnNumber,
resumableUpload.location,
resumableUpload.timeout,
headers
);
}
} }