mirror of
https://github.com/signalapp/Signal-Android.git
synced 2026-02-21 10:17:56 +00:00
Upload large backup attachments using a foreground service.
This commit is contained in:
@@ -9,6 +9,7 @@ import android.content.Context
|
||||
import android.graphics.Bitmap
|
||||
import android.os.Build
|
||||
import org.signal.core.util.logging.Log
|
||||
import org.signal.core.util.mebiBytes
|
||||
import org.signal.protos.resumableuploads.ResumableUpload
|
||||
import org.thoughtcrime.securesms.blurhash.BlurHashEncoder
|
||||
import org.thoughtcrime.securesms.mms.PartAuthority
|
||||
@@ -27,6 +28,11 @@ object AttachmentUploadUtil {
|
||||
|
||||
private val TAG = Log.tag(AttachmentUploadUtil::class.java)
|
||||
|
||||
/**
|
||||
* Foreground notification shows while uploading attachments larger than this.
|
||||
*/
|
||||
val FOREGROUND_LIMIT_BYTES: Long = 10.mebiBytes.inWholeBytes
|
||||
|
||||
/**
|
||||
* Builds a [SignalServiceAttachmentStream] from the provided data, which can then be provided to various upload methods.
|
||||
*/
|
||||
|
||||
@@ -26,6 +26,7 @@ import org.thoughtcrime.securesms.jobs.BackfillDigestJob
|
||||
import org.thoughtcrime.securesms.jobs.UploadAttachmentToArchiveJob
|
||||
import org.thoughtcrime.securesms.keyvalue.SignalStore
|
||||
import org.thoughtcrime.securesms.keyvalue.protos.ArchiveUploadProgressState
|
||||
import org.whispersystems.signalservice.api.messages.AttachmentTransferProgress
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import kotlin.math.max
|
||||
import kotlin.time.Duration.Companion.milliseconds
|
||||
@@ -143,12 +144,12 @@ object ArchiveUploadProgress {
|
||||
}
|
||||
}
|
||||
|
||||
fun onMessageBackupUploadProgress(totalBytes: Long, bytesUploaded: Long) {
|
||||
fun onMessageBackupUploadProgress(progress: AttachmentTransferProgress) {
|
||||
updateState {
|
||||
it.copy(
|
||||
state = ArchiveUploadProgressState.State.UploadBackupFile,
|
||||
backupFileUploadedBytes = bytesUploaded,
|
||||
backupFileTotalBytes = totalBytes
|
||||
backupFileUploadedBytes = progress.transmitted.inWholeBytes,
|
||||
backupFileTotalBytes = progress.total.inWholeBytes
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package org.thoughtcrime.securesms.events;
|
||||
import androidx.annotation.NonNull;
|
||||
|
||||
import org.thoughtcrime.securesms.attachments.Attachment;
|
||||
import org.whispersystems.signalservice.api.messages.AttachmentTransferProgress;
|
||||
|
||||
public final class PartProgressEvent {
|
||||
|
||||
@@ -22,4 +23,11 @@ public final class PartProgressEvent {
|
||||
this.total = total;
|
||||
this.progress = progress;
|
||||
}
|
||||
|
||||
public PartProgressEvent(@NonNull Attachment attachment, @NonNull Type type, @NonNull AttachmentTransferProgress progress) {
|
||||
this.attachment = attachment;
|
||||
this.type = type;
|
||||
this.total = progress.getTotal().getInWholeBytes();
|
||||
this.progress = progress.getTransmitted().getInWholeBytes();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -271,7 +271,7 @@ public final class AttachmentCompressionJob extends BaseJob {
|
||||
try (OutputStream outputStream = ModernEncryptingPartOutputStream.createFor(attachmentSecret, file, true).second) {
|
||||
mdatLength = (int) transcoder.transcode(percent -> {
|
||||
if (notification != null) {
|
||||
notification.setProgress(percent / 100f);
|
||||
notification.updateProgress(percent / 100f);
|
||||
}
|
||||
eventBus.postSticky(new PartProgressEvent(attachment,
|
||||
PartProgressEvent.Type.COMPRESSION,
|
||||
|
||||
@@ -36,6 +36,7 @@ import org.thoughtcrime.securesms.transport.RetryLaterException
|
||||
import org.thoughtcrime.securesms.util.AttachmentUtil
|
||||
import org.thoughtcrime.securesms.util.RemoteConfig
|
||||
import org.thoughtcrime.securesms.util.Util
|
||||
import org.whispersystems.signalservice.api.messages.AttachmentTransferProgress
|
||||
import org.whispersystems.signalservice.api.messages.SignalServiceAttachment
|
||||
import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentPointer
|
||||
import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentRemoteId
|
||||
@@ -279,8 +280,8 @@ class AttachmentDownloadJob private constructor(
|
||||
val pointer = createAttachmentPointer(attachment)
|
||||
|
||||
val progressListener = object : SignalServiceAttachment.ProgressListener {
|
||||
override fun onAttachmentProgress(total: Long, progress: Long) {
|
||||
EventBus.getDefault().postSticky(PartProgressEvent(attachment, PartProgressEvent.Type.NETWORK, total, progress))
|
||||
override fun onAttachmentProgress(progress: AttachmentTransferProgress) {
|
||||
EventBus.getDefault().postSticky(PartProgressEvent(attachment, PartProgressEvent.Type.NETWORK, progress))
|
||||
}
|
||||
|
||||
override fun shouldCancel(): Boolean {
|
||||
|
||||
@@ -11,7 +11,6 @@ import org.signal.core.util.Base64
|
||||
import org.signal.core.util.concurrent.SignalExecutors
|
||||
import org.signal.core.util.inRoundedDays
|
||||
import org.signal.core.util.logging.Log
|
||||
import org.signal.core.util.mebiBytes
|
||||
import org.signal.protos.resumableuploads.ResumableUpload
|
||||
import org.thoughtcrime.securesms.R
|
||||
import org.thoughtcrime.securesms.attachments.Attachment
|
||||
@@ -34,6 +33,7 @@ import org.thoughtcrime.securesms.service.AttachmentProgressService
|
||||
import org.thoughtcrime.securesms.util.RemoteConfig
|
||||
import org.whispersystems.signalservice.api.attachment.AttachmentUploadResult
|
||||
import org.whispersystems.signalservice.api.crypto.AttachmentCipherStreamUtil
|
||||
import org.whispersystems.signalservice.api.messages.AttachmentTransferProgress
|
||||
import org.whispersystems.signalservice.api.messages.SignalServiceAttachment
|
||||
import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentStream
|
||||
import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulResumableUploadResponseCodeException
|
||||
@@ -65,11 +65,6 @@ class AttachmentUploadJob private constructor(
|
||||
|
||||
val UPLOAD_REUSE_THRESHOLD = 3.days.inWholeMilliseconds
|
||||
|
||||
/**
|
||||
* Foreground notification shows while uploading attachments above this.
|
||||
*/
|
||||
private val FOREGROUND_LIMIT = 10.mebiBytes.inWholeBytes
|
||||
|
||||
@JvmStatic
|
||||
val maxPlaintextSize: Long
|
||||
get() {
|
||||
@@ -225,7 +220,7 @@ class AttachmentUploadJob private constructor(
|
||||
}
|
||||
|
||||
private fun getAttachmentNotificationIfNeeded(attachment: Attachment): AttachmentProgressService.Controller? {
|
||||
return if (attachment.size >= FOREGROUND_LIMIT) {
|
||||
return if (attachment.size >= AttachmentUploadUtil.FOREGROUND_LIMIT_BYTES) {
|
||||
AttachmentProgressService.start(context, context.getString(R.string.AttachmentUploadJob_uploading_media))
|
||||
} else {
|
||||
null
|
||||
@@ -264,17 +259,10 @@ class AttachmentUploadJob private constructor(
|
||||
uploadSpec = resumableUploadSpec,
|
||||
cancellationSignal = { isCanceled },
|
||||
progressListener = object : SignalServiceAttachment.ProgressListener {
|
||||
private var lastUpdate = 0L
|
||||
private val updateRate = 500.milliseconds.inWholeMilliseconds
|
||||
|
||||
override fun onAttachmentProgress(total: Long, progress: Long) {
|
||||
val now = System.currentTimeMillis()
|
||||
if (now < lastUpdate || lastUpdate + updateRate < now || progress >= total) {
|
||||
SignalExecutors.BOUNDED_IO.execute {
|
||||
EventBus.getDefault().postSticky(PartProgressEvent(attachment, PartProgressEvent.Type.NETWORK, total, progress))
|
||||
notification?.progress = (progress.toFloat() / total)
|
||||
}
|
||||
lastUpdate = now
|
||||
override fun onAttachmentProgress(progress: AttachmentTransferProgress) {
|
||||
SignalExecutors.BOUNDED_IO.execute {
|
||||
EventBus.getDefault().postSticky(PartProgressEvent(attachment, PartProgressEvent.Type.NETWORK, progress))
|
||||
notification?.updateProgress(progress.value)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -23,6 +23,7 @@ import org.thoughtcrime.securesms.jobs.protos.BackupMessagesJobData
|
||||
import org.thoughtcrime.securesms.keyvalue.SignalStore
|
||||
import org.thoughtcrime.securesms.providers.BlobProvider
|
||||
import org.whispersystems.signalservice.api.NetworkResult
|
||||
import org.whispersystems.signalservice.api.messages.AttachmentTransferProgress
|
||||
import org.whispersystems.signalservice.api.messages.SignalServiceAttachment
|
||||
import org.whispersystems.signalservice.internal.push.AttachmentUploadForm
|
||||
import java.io.File
|
||||
@@ -138,8 +139,8 @@ class BackupMessagesJob private constructor(
|
||||
}
|
||||
|
||||
val progressListener = object : SignalServiceAttachment.ProgressListener {
|
||||
override fun onAttachmentProgress(total: Long, progress: Long) {
|
||||
ArchiveUploadProgress.onMessageBackupUploadProgress(total, progress)
|
||||
override fun onAttachmentProgress(progress: AttachmentTransferProgress) {
|
||||
ArchiveUploadProgress.onMessageBackupUploadProgress(progress)
|
||||
}
|
||||
|
||||
override fun shouldCancel(): Boolean = isCanceled
|
||||
|
||||
@@ -6,7 +6,6 @@
|
||||
package org.thoughtcrime.securesms.jobs
|
||||
|
||||
import org.greenrobot.eventbus.EventBus
|
||||
import org.signal.core.util.bytes
|
||||
import org.signal.core.util.logging.Log
|
||||
import org.signal.libsignal.zkgroup.profiles.ProfileKey
|
||||
import org.thoughtcrime.securesms.R
|
||||
@@ -23,6 +22,7 @@ import org.thoughtcrime.securesms.providers.BlobProvider
|
||||
import org.thoughtcrime.securesms.recipients.Recipient
|
||||
import org.thoughtcrime.securesms.service.BackupProgressService
|
||||
import org.whispersystems.signalservice.api.NetworkResult
|
||||
import org.whispersystems.signalservice.api.messages.AttachmentTransferProgress
|
||||
import org.whispersystems.signalservice.api.messages.SignalServiceAttachment.ProgressListener
|
||||
import java.io.IOException
|
||||
|
||||
@@ -71,13 +71,13 @@ class BackupRestoreJob private constructor(parameters: Parameters) : BaseJob(par
|
||||
SignalStore.backup.restoreState = RestoreState.RESTORING_DB
|
||||
|
||||
val progressListener = object : ProgressListener {
|
||||
override fun onAttachmentProgress(total: Long, progress: Long) {
|
||||
override fun onAttachmentProgress(progress: AttachmentTransferProgress) {
|
||||
controller.update(
|
||||
title = context.getString(R.string.BackupProgressService_title_downloading),
|
||||
progress = progress.toFloat() / total.toFloat(),
|
||||
progress = progress.value,
|
||||
indeterminate = false
|
||||
)
|
||||
EventBus.getDefault().post(RestoreV2Event(RestoreV2Event.Type.PROGRESS_DOWNLOAD, progress.bytes, total.bytes))
|
||||
EventBus.getDefault().post(RestoreV2Event(RestoreV2Event.Type.PROGRESS_DOWNLOAD, progress.transmitted, progress.total))
|
||||
}
|
||||
|
||||
override fun shouldCancel() = isCanceled
|
||||
|
||||
@@ -165,7 +165,7 @@ class BackupSubscriptionCheckJob private constructor(parameters: Parameters) : C
|
||||
*/
|
||||
private fun checkForFailedOrCanceledSubscriptionState(activeSubscription: ActiveSubscription?) {
|
||||
val containsFailedPaymentOrCancellation = activeSubscription?.isFailedPayment == true || activeSubscription?.isCanceled == true
|
||||
if (containsFailedPaymentOrCancellation && activeSubscription.activeSubscription != null) {
|
||||
if (containsFailedPaymentOrCancellation && activeSubscription?.activeSubscription != null) {
|
||||
Log.i(TAG, "Subscription either has a payment failure or has been canceled.")
|
||||
|
||||
val response = SignalNetwork.account.whoAmI()
|
||||
|
||||
@@ -16,8 +16,6 @@ import androidx.annotation.Nullable;
|
||||
import com.annimon.stream.Stream;
|
||||
|
||||
import org.greenrobot.eventbus.EventBus;
|
||||
import org.greenrobot.eventbus.Subscribe;
|
||||
import org.greenrobot.eventbus.ThreadMode;
|
||||
import org.signal.core.util.Hex;
|
||||
import org.signal.core.util.logging.Log;
|
||||
import org.signal.libsignal.metadata.certificate.InvalidCertificateException;
|
||||
@@ -67,6 +65,7 @@ import org.thoughtcrime.securesms.util.RemoteConfig;
|
||||
import org.thoughtcrime.securesms.util.ImageCompressionUtil;
|
||||
import org.thoughtcrime.securesms.util.MediaUtil;
|
||||
import org.thoughtcrime.securesms.util.Util;
|
||||
import org.whispersystems.signalservice.api.messages.AttachmentTransferProgress;
|
||||
import org.whispersystems.signalservice.api.messages.SignalServiceAttachment;
|
||||
import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentPointer;
|
||||
import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentRemoteId;
|
||||
@@ -90,7 +89,6 @@ import java.util.Locale;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@@ -219,8 +217,8 @@ public abstract class PushSendJob extends SendJob {
|
||||
.withResumableUploadSpec(AppDependencies.getSignalServiceMessageSender().getResumableUploadSpec())
|
||||
.withListener(new SignalServiceAttachment.ProgressListener() {
|
||||
@Override
|
||||
public void onAttachmentProgress(long total, long progress) {
|
||||
EventBus.getDefault().postSticky(new PartProgressEvent(attachment, PartProgressEvent.Type.NETWORK, total, progress));
|
||||
public void onAttachmentProgress(@NonNull AttachmentTransferProgress progress) {
|
||||
EventBus.getDefault().postSticky(new PartProgressEvent(attachment, PartProgressEvent.Type.NETWORK, progress));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -38,6 +38,7 @@ import org.thoughtcrime.securesms.notifications.NotificationChannels
|
||||
import org.thoughtcrime.securesms.notifications.NotificationIds
|
||||
import org.thoughtcrime.securesms.transport.RetryLaterException
|
||||
import org.thoughtcrime.securesms.util.RemoteConfig
|
||||
import org.whispersystems.signalservice.api.messages.AttachmentTransferProgress
|
||||
import org.whispersystems.signalservice.api.messages.SignalServiceAttachment
|
||||
import org.whispersystems.signalservice.api.push.exceptions.MissingConfigurationException
|
||||
import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulResponseCodeException
|
||||
@@ -230,8 +231,8 @@ class RestoreAttachmentJob private constructor(
|
||||
val pointer = attachment.createArchiveAttachmentPointer(useArchiveCdn)
|
||||
|
||||
val progressListener = object : SignalServiceAttachment.ProgressListener {
|
||||
override fun onAttachmentProgress(total: Long, progress: Long) {
|
||||
EventBus.getDefault().postSticky(PartProgressEvent(attachment, PartProgressEvent.Type.NETWORK, total, progress))
|
||||
override fun onAttachmentProgress(progress: AttachmentTransferProgress) {
|
||||
EventBus.getDefault().postSticky(PartProgressEvent(attachment, PartProgressEvent.Type.NETWORK, progress))
|
||||
}
|
||||
|
||||
override fun shouldCancel(): Boolean {
|
||||
|
||||
@@ -20,6 +20,7 @@ import org.thoughtcrime.securesms.jobmanager.JsonJobData
|
||||
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint
|
||||
import org.thoughtcrime.securesms.keyvalue.SignalStore
|
||||
import org.thoughtcrime.securesms.util.RemoteConfig
|
||||
import org.whispersystems.signalservice.api.messages.AttachmentTransferProgress
|
||||
import org.whispersystems.signalservice.api.messages.SignalServiceAttachment
|
||||
import org.whispersystems.signalservice.api.push.exceptions.MissingConfigurationException
|
||||
import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulResponseCodeException
|
||||
@@ -122,7 +123,7 @@ class RestoreAttachmentThumbnailJob private constructor(
|
||||
val thumbnailFile: File = SignalDatabase.attachments.createArchiveThumbnailTransferFile()
|
||||
|
||||
val progressListener = object : SignalServiceAttachment.ProgressListener {
|
||||
override fun onAttachmentProgress(total: Long, progress: Long) = Unit
|
||||
override fun onAttachmentProgress(progress: AttachmentTransferProgress) = Unit
|
||||
override fun shouldCancel(): Boolean = this@RestoreAttachmentThumbnailJob.isCanceled
|
||||
}
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@ import org.signal.core.util.isNotNullOrBlank
|
||||
import org.signal.core.util.logging.Log
|
||||
import org.signal.core.util.readLength
|
||||
import org.signal.protos.resumableuploads.ResumableUpload
|
||||
import org.thoughtcrime.securesms.R
|
||||
import org.thoughtcrime.securesms.attachments.AttachmentId
|
||||
import org.thoughtcrime.securesms.attachments.AttachmentUploadUtil
|
||||
import org.thoughtcrime.securesms.attachments.DatabaseAttachment
|
||||
@@ -24,9 +25,11 @@ import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint
|
||||
import org.thoughtcrime.securesms.jobs.protos.UploadAttachmentToArchiveJobData
|
||||
import org.thoughtcrime.securesms.keyvalue.SignalStore
|
||||
import org.thoughtcrime.securesms.net.SignalNetwork
|
||||
import org.thoughtcrime.securesms.service.AttachmentProgressService
|
||||
import org.whispersystems.signalservice.api.NetworkResult
|
||||
import org.whispersystems.signalservice.api.archive.ArchiveMediaUploadFormStatusCodes
|
||||
import org.whispersystems.signalservice.api.attachment.AttachmentUploadResult
|
||||
import org.whispersystems.signalservice.api.messages.AttachmentTransferProgress
|
||||
import org.whispersystems.signalservice.api.messages.SignalServiceAttachment
|
||||
import java.io.FileNotFoundException
|
||||
import java.io.IOException
|
||||
@@ -49,7 +52,7 @@ class UploadAttachmentToArchiveJob private constructor(
|
||||
companion object {
|
||||
private val TAG = Log.tag(UploadAttachmentToArchiveJob::class)
|
||||
const val KEY = "UploadAttachmentToArchiveJob"
|
||||
const val MAX_JOB_QUEUES = 2
|
||||
private const val MAX_JOB_QUEUES = 2
|
||||
|
||||
/**
|
||||
* This randomly selects between one of [MAX_JOB_QUEUES] queues. It's a fun way of limiting the concurrency of the upload jobs to
|
||||
@@ -151,6 +154,12 @@ class UploadAttachmentToArchiveJob private constructor(
|
||||
Log.d(TAG, "[$attachmentId] Already have an upload spec. Continuing...")
|
||||
}
|
||||
|
||||
val progressServiceController = if (attachment.size >= AttachmentUploadUtil.FOREGROUND_LIMIT_BYTES) {
|
||||
AttachmentProgressService.start(context, context.getString(R.string.AttachmentUploadJob_uploading_media))
|
||||
} else {
|
||||
null
|
||||
}
|
||||
|
||||
val attachmentStream = try {
|
||||
AttachmentUploadUtil.buildSignalServiceAttachmentStream(
|
||||
context = context,
|
||||
@@ -158,7 +167,11 @@ class UploadAttachmentToArchiveJob private constructor(
|
||||
uploadSpec = uploadSpec!!,
|
||||
cancellationSignal = { this.isCanceled },
|
||||
progressListener = object : SignalServiceAttachment.ProgressListener {
|
||||
override fun onAttachmentProgress(total: Long, progress: Long) = ArchiveUploadProgress.onAttachmentProgress(attachmentId, progress)
|
||||
override fun onAttachmentProgress(progress: AttachmentTransferProgress) {
|
||||
ArchiveUploadProgress.onAttachmentProgress(attachmentId, progress.transmitted.inWholeBytes)
|
||||
progressServiceController?.updateProgress(progress.value)
|
||||
}
|
||||
|
||||
override fun shouldCancel() = this@UploadAttachmentToArchiveJob.isCanceled
|
||||
}
|
||||
)
|
||||
@@ -172,34 +185,37 @@ class UploadAttachmentToArchiveJob private constructor(
|
||||
}
|
||||
|
||||
Log.d(TAG, "[$attachmentId] Beginning upload...")
|
||||
val uploadResult: AttachmentUploadResult = when (val result = SignalNetwork.attachments.uploadAttachmentV4(attachmentStream)) {
|
||||
is NetworkResult.Success -> result.result
|
||||
is NetworkResult.ApplicationError -> throw result.throwable
|
||||
is NetworkResult.NetworkError -> {
|
||||
Log.w(TAG, "[$attachmentId] Failed to upload due to network error.", result.exception)
|
||||
progressServiceController.use {
|
||||
val uploadResult: AttachmentUploadResult = when (val result = SignalNetwork.attachments.uploadAttachmentV4(attachmentStream)) {
|
||||
is NetworkResult.Success -> result.result
|
||||
is NetworkResult.ApplicationError -> throw result.throwable
|
||||
is NetworkResult.NetworkError -> {
|
||||
Log.w(TAG, "[$attachmentId] Failed to upload due to network error.", result.exception)
|
||||
|
||||
if (result.exception.cause is ProtocolException) {
|
||||
Log.w(TAG, "[$attachmentId] Length may be incorrect. Recalculating.", result.exception)
|
||||
if (result.exception.cause is ProtocolException) {
|
||||
Log.w(TAG, "[$attachmentId] Length may be incorrect. Recalculating.", result.exception)
|
||||
|
||||
val actualLength = SignalDatabase.attachments.getAttachmentStream(attachmentId, 0).readLength()
|
||||
if (actualLength != attachment.size) {
|
||||
Log.w(TAG, "[$attachmentId] Length was incorrect! Will update. Previous: ${attachment.size}, Newly-Calculated: $actualLength", result.exception)
|
||||
SignalDatabase.attachments.updateAttachmentLength(attachmentId, actualLength)
|
||||
} else {
|
||||
Log.i(TAG, "[$attachmentId] Length was correct. No action needed. Will retry.")
|
||||
val actualLength = SignalDatabase.attachments.getAttachmentStream(attachmentId, 0).readLength()
|
||||
if (actualLength != attachment.size) {
|
||||
Log.w(TAG, "[$attachmentId] Length was incorrect! Will update. Previous: ${attachment.size}, Newly-Calculated: $actualLength", result.exception)
|
||||
SignalDatabase.attachments.updateAttachmentLength(attachmentId, actualLength)
|
||||
} else {
|
||||
Log.i(TAG, "[$attachmentId] Length was correct. No action needed. Will retry.")
|
||||
}
|
||||
}
|
||||
|
||||
return Result.retry(defaultBackoff())
|
||||
}
|
||||
|
||||
return Result.retry(defaultBackoff())
|
||||
is NetworkResult.StatusCodeError -> {
|
||||
Log.w(TAG, "[$attachmentId] Failed to upload due to status code error. Code: ${result.code}", result.exception)
|
||||
return Result.retry(defaultBackoff())
|
||||
}
|
||||
}
|
||||
is NetworkResult.StatusCodeError -> {
|
||||
Log.w(TAG, "[$attachmentId] Failed to upload due to status code error. Code: ${result.code}", result.exception)
|
||||
return Result.retry(defaultBackoff())
|
||||
}
|
||||
}
|
||||
Log.d(TAG, "[$attachmentId] Upload complete!")
|
||||
|
||||
SignalDatabase.attachments.finalizeAttachmentAfterUpload(attachment.attachmentId, uploadResult)
|
||||
Log.d(TAG, "[$attachmentId] Upload complete!")
|
||||
SignalDatabase.attachments.finalizeAttachmentAfterUpload(attachment.attachmentId, uploadResult)
|
||||
}
|
||||
|
||||
if (!isCanceled) {
|
||||
AppDependencies.jobManager.add(CopyAttachmentToArchiveJob(attachment.attachmentId))
|
||||
|
||||
@@ -10,8 +10,15 @@ import android.app.PendingIntent
|
||||
import android.content.Context
|
||||
import android.content.Intent
|
||||
import androidx.core.app.NotificationCompat
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.cancel
|
||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.coroutines.flow.collectLatest
|
||||
import kotlinx.coroutines.launch
|
||||
import org.signal.core.util.PendingIntentFlags
|
||||
import org.signal.core.util.logging.Log
|
||||
import org.signal.core.util.throttleLatest
|
||||
import org.thoughtcrime.securesms.MainActivity
|
||||
import org.thoughtcrime.securesms.R
|
||||
import org.thoughtcrime.securesms.notifications.NotificationChannels
|
||||
@@ -19,6 +26,7 @@ import org.thoughtcrime.securesms.notifications.NotificationIds
|
||||
import java.util.concurrent.CopyOnWriteArraySet
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
import kotlin.concurrent.withLock
|
||||
import kotlin.time.Duration.Companion.milliseconds
|
||||
|
||||
/**
|
||||
* A service to show attachment progress. In order to ensure we only show one status notification,
|
||||
@@ -60,7 +68,7 @@ class AttachmentProgressService : SafeForegroundService() {
|
||||
controllerLock.withLock {
|
||||
val started = if (controllers.isEmpty()) {
|
||||
Log.i(TAG, "[start] First controller. Starting.")
|
||||
SafeForegroundService.start(context, AttachmentProgressService::class.java)
|
||||
start(context, AttachmentProgressService::class.java)
|
||||
} else {
|
||||
Log.i(TAG, "[start] No need to start the service again. Already have an active controller.")
|
||||
true
|
||||
@@ -78,7 +86,7 @@ class AttachmentProgressService : SafeForegroundService() {
|
||||
}
|
||||
|
||||
private fun stop(context: Context) {
|
||||
SafeForegroundService.stop(context, AttachmentProgressService::class.java)
|
||||
stop(context, AttachmentProgressService::class.java)
|
||||
}
|
||||
|
||||
private fun onControllersChanged(context: Context) {
|
||||
@@ -132,6 +140,17 @@ class AttachmentProgressService : SafeForegroundService() {
|
||||
}
|
||||
|
||||
class Controller(private val context: Context, title: String) : AutoCloseable {
|
||||
private val coroutineScope = CoroutineScope(Dispatchers.IO)
|
||||
private val progressFlow = MutableSharedFlow<Float>(replay = 0, extraBufferCapacity = 1)
|
||||
|
||||
init {
|
||||
coroutineScope.launch {
|
||||
progressFlow
|
||||
.throttleLatest(500.milliseconds) // avoid OS notification rate limiting
|
||||
.collectLatest { progress = it }
|
||||
}
|
||||
}
|
||||
|
||||
var title: String = title
|
||||
set(value) {
|
||||
field = value
|
||||
@@ -139,7 +158,7 @@ class AttachmentProgressService : SafeForegroundService() {
|
||||
}
|
||||
|
||||
var progress: Float = 0f
|
||||
set(value) {
|
||||
private set(value) {
|
||||
field = value
|
||||
indeterminate = false
|
||||
onControllersChanged(context)
|
||||
@@ -155,8 +174,13 @@ class AttachmentProgressService : SafeForegroundService() {
|
||||
onControllersChanged(context)
|
||||
}
|
||||
|
||||
fun updateProgress(progress: Float) {
|
||||
progressFlow.tryEmit(progress)
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
controllerLock.withLock {
|
||||
coroutineScope.cancel()
|
||||
controllers.remove(this)
|
||||
onControllersChanged(context)
|
||||
}
|
||||
|
||||
@@ -4,6 +4,8 @@
|
||||
*/
|
||||
package org.whispersystems.signalservice.api.messages
|
||||
|
||||
import org.signal.core.util.ByteSize
|
||||
import org.signal.core.util.bytes
|
||||
import org.whispersystems.signalservice.internal.push.http.CancelationSignal
|
||||
import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec
|
||||
import java.io.InputStream
|
||||
@@ -160,11 +162,8 @@ abstract class SignalServiceAttachment protected constructor(val contentType: St
|
||||
interface ProgressListener {
|
||||
/**
|
||||
* Called on a progress change event.
|
||||
*
|
||||
* @param total The total amount to transmit/receive in bytes.
|
||||
* @param progress The amount that has been transmitted/received in bytes thus far
|
||||
*/
|
||||
fun onAttachmentProgress(total: Long, progress: Long)
|
||||
fun onAttachmentProgress(progress: AttachmentTransferProgress)
|
||||
|
||||
fun shouldCancel(): Boolean
|
||||
}
|
||||
@@ -176,3 +175,21 @@ abstract class SignalServiceAttachment protected constructor(val contentType: St
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Progress status for an attachment upload/download operation.
|
||||
*/
|
||||
data class AttachmentTransferProgress(
|
||||
/** The total amount of bytes to transmit/receive. */
|
||||
val total: ByteSize,
|
||||
|
||||
/** The amount of bytes that have been transmitted/received thus far. */
|
||||
val transmitted: ByteSize
|
||||
) {
|
||||
constructor(total: Long, transmitted: Long) : this(total.bytes, transmitted.bytes)
|
||||
|
||||
/**
|
||||
* The fractional progress as a float value between 0.0 and 1.0 (inclusive).
|
||||
*/
|
||||
val value = transmitted.inWholeBytes.toFloat() / total.inWholeBytes
|
||||
}
|
||||
|
||||
@@ -28,6 +28,7 @@ import org.whispersystems.signalservice.api.account.AccountAttributes;
|
||||
import org.whispersystems.signalservice.api.account.PreKeyCollection;
|
||||
import org.whispersystems.signalservice.api.crypto.SealedSenderAccess;
|
||||
import org.whispersystems.signalservice.api.groupsv2.GroupsV2AuthorizationString;
|
||||
import org.whispersystems.signalservice.api.messages.AttachmentTransferProgress;
|
||||
import org.whispersystems.signalservice.api.messages.SignalServiceAttachment.ProgressListener;
|
||||
import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentRemoteId;
|
||||
import org.whispersystems.signalservice.api.messages.calls.CallingResponse;
|
||||
@@ -653,7 +654,7 @@ public class PushServiceSocket {
|
||||
if ((totalRead += read) > maxSizeBytes) throw new PushNetworkException("Response exceeded max size!");
|
||||
|
||||
if (listener != null) {
|
||||
listener.onAttachmentProgress(body.contentLength() + offset, totalRead);
|
||||
listener.onAttachmentProgress(new AttachmentTransferProgress(body.contentLength() + offset, totalRead));
|
||||
if (listener.shouldCancel()) {
|
||||
call.cancel();
|
||||
throw new PushNetworkException("Canceled by listener check.");
|
||||
|
||||
@@ -8,6 +8,7 @@ import org.signal.libsignal.protocol.incrementalmac.ChunkSizeChoice
|
||||
import org.signal.libsignal.protocol.logging.Log
|
||||
import org.whispersystems.signalservice.api.crypto.DigestingOutputStream
|
||||
import org.whispersystems.signalservice.api.crypto.SkippingOutputStream
|
||||
import org.whispersystems.signalservice.api.messages.AttachmentTransferProgress
|
||||
import org.whispersystems.signalservice.api.messages.SignalServiceAttachment
|
||||
import org.whispersystems.signalservice.internal.crypto.AttachmentDigest
|
||||
import java.io.ByteArrayOutputStream
|
||||
@@ -58,7 +59,7 @@ class DigestingRequestBody(
|
||||
throw IOException("Canceled!")
|
||||
}
|
||||
outputStream.write(buffer, 0, read)
|
||||
progressListener?.onAttachmentProgress(contentLength, outputStream.totalBytesWritten)
|
||||
progressListener?.onAttachmentProgress(AttachmentTransferProgress(total = contentLength, transmitted = outputStream.totalBytesWritten))
|
||||
}
|
||||
|
||||
outputStream.flush()
|
||||
|
||||
@@ -11,6 +11,7 @@ import org.junit.Assert.assertEquals
|
||||
import org.junit.Assert.assertNotNull
|
||||
import org.junit.Test
|
||||
import org.whispersystems.signalservice.api.crypto.AttachmentCipherStreamUtil
|
||||
import org.whispersystems.signalservice.api.messages.AttachmentTransferProgress
|
||||
import org.whispersystems.signalservice.api.messages.SignalServiceAttachment
|
||||
import org.whispersystems.signalservice.internal.util.Util
|
||||
import java.io.ByteArrayInputStream
|
||||
@@ -80,7 +81,7 @@ class DigestingRequestBodyTest {
|
||||
contentLength = CONTENT_LENGTH.toLong(),
|
||||
incremental = false,
|
||||
progressListener = object : SignalServiceAttachment.ProgressListener {
|
||||
override fun onAttachmentProgress(total: Long, progress: Long) {
|
||||
override fun onAttachmentProgress(progress: AttachmentTransferProgress) {
|
||||
// no-op
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user