diff --git a/app/src/main/java/org/thoughtcrime/securesms/attachments/AttachmentUploadUtil.kt b/app/src/main/java/org/thoughtcrime/securesms/attachments/AttachmentUploadUtil.kt index 16a2970eaf..ade12ec818 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/attachments/AttachmentUploadUtil.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/attachments/AttachmentUploadUtil.kt @@ -9,6 +9,7 @@ import android.content.Context import android.graphics.Bitmap import android.os.Build import org.signal.core.util.logging.Log +import org.signal.core.util.mebiBytes import org.signal.protos.resumableuploads.ResumableUpload import org.thoughtcrime.securesms.blurhash.BlurHashEncoder import org.thoughtcrime.securesms.mms.PartAuthority @@ -27,6 +28,11 @@ object AttachmentUploadUtil { private val TAG = Log.tag(AttachmentUploadUtil::class.java) + /** + * Foreground notification shows while uploading attachments larger than this. + */ + val FOREGROUND_LIMIT_BYTES: Long = 10.mebiBytes.inWholeBytes + /** * Builds a [SignalServiceAttachmentStream] from the provided data, which can then be provided to various upload methods. */ diff --git a/app/src/main/java/org/thoughtcrime/securesms/backup/ArchiveUploadProgress.kt b/app/src/main/java/org/thoughtcrime/securesms/backup/ArchiveUploadProgress.kt index 7377a9a867..fc26147a00 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/backup/ArchiveUploadProgress.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/backup/ArchiveUploadProgress.kt @@ -26,6 +26,7 @@ import org.thoughtcrime.securesms.jobs.BackfillDigestJob import org.thoughtcrime.securesms.jobs.UploadAttachmentToArchiveJob import org.thoughtcrime.securesms.keyvalue.SignalStore import org.thoughtcrime.securesms.keyvalue.protos.ArchiveUploadProgressState +import org.whispersystems.signalservice.api.messages.AttachmentTransferProgress import java.util.concurrent.ConcurrentHashMap import kotlin.math.max import kotlin.time.Duration.Companion.milliseconds @@ -143,12 +144,12 @@ object ArchiveUploadProgress { } } - fun onMessageBackupUploadProgress(totalBytes: Long, bytesUploaded: Long) { + fun onMessageBackupUploadProgress(progress: AttachmentTransferProgress) { updateState { it.copy( state = ArchiveUploadProgressState.State.UploadBackupFile, - backupFileUploadedBytes = bytesUploaded, - backupFileTotalBytes = totalBytes + backupFileUploadedBytes = progress.transmitted.inWholeBytes, + backupFileTotalBytes = progress.total.inWholeBytes ) } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/events/PartProgressEvent.java b/app/src/main/java/org/thoughtcrime/securesms/events/PartProgressEvent.java index 1c669d825d..738bee4e06 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/events/PartProgressEvent.java +++ b/app/src/main/java/org/thoughtcrime/securesms/events/PartProgressEvent.java @@ -3,6 +3,7 @@ package org.thoughtcrime.securesms.events; import androidx.annotation.NonNull; import org.thoughtcrime.securesms.attachments.Attachment; +import org.whispersystems.signalservice.api.messages.AttachmentTransferProgress; public final class PartProgressEvent { @@ -22,4 +23,11 @@ public final class PartProgressEvent { this.total = total; this.progress = progress; } + + public PartProgressEvent(@NonNull Attachment attachment, @NonNull Type type, @NonNull AttachmentTransferProgress progress) { + this.attachment = attachment; + this.type = type; + this.total = progress.getTotal().getInWholeBytes(); + this.progress = progress.getTransmitted().getInWholeBytes(); + } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentCompressionJob.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentCompressionJob.java index 2f8500f4fc..b29fa14a4a 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentCompressionJob.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentCompressionJob.java @@ -271,7 +271,7 @@ public final class AttachmentCompressionJob extends BaseJob { try (OutputStream outputStream = ModernEncryptingPartOutputStream.createFor(attachmentSecret, file, true).second) { mdatLength = (int) transcoder.transcode(percent -> { if (notification != null) { - notification.setProgress(percent / 100f); + notification.updateProgress(percent / 100f); } eventBus.postSticky(new PartProgressEvent(attachment, PartProgressEvent.Type.COMPRESSION, diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentDownloadJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentDownloadJob.kt index 319062c94f..daccdf257e 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentDownloadJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentDownloadJob.kt @@ -36,6 +36,7 @@ import org.thoughtcrime.securesms.transport.RetryLaterException import org.thoughtcrime.securesms.util.AttachmentUtil import org.thoughtcrime.securesms.util.RemoteConfig import org.thoughtcrime.securesms.util.Util +import org.whispersystems.signalservice.api.messages.AttachmentTransferProgress import org.whispersystems.signalservice.api.messages.SignalServiceAttachment import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentPointer import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentRemoteId @@ -279,8 +280,8 @@ class AttachmentDownloadJob private constructor( val pointer = createAttachmentPointer(attachment) val progressListener = object : SignalServiceAttachment.ProgressListener { - override fun onAttachmentProgress(total: Long, progress: Long) { - EventBus.getDefault().postSticky(PartProgressEvent(attachment, PartProgressEvent.Type.NETWORK, total, progress)) + override fun onAttachmentProgress(progress: AttachmentTransferProgress) { + EventBus.getDefault().postSticky(PartProgressEvent(attachment, PartProgressEvent.Type.NETWORK, progress)) } override fun shouldCancel(): Boolean { diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentUploadJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentUploadJob.kt index 5be31299a9..219c2a884d 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentUploadJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentUploadJob.kt @@ -11,7 +11,6 @@ import org.signal.core.util.Base64 import org.signal.core.util.concurrent.SignalExecutors import org.signal.core.util.inRoundedDays import org.signal.core.util.logging.Log -import org.signal.core.util.mebiBytes import org.signal.protos.resumableuploads.ResumableUpload import org.thoughtcrime.securesms.R import org.thoughtcrime.securesms.attachments.Attachment @@ -34,6 +33,7 @@ import org.thoughtcrime.securesms.service.AttachmentProgressService import org.thoughtcrime.securesms.util.RemoteConfig import org.whispersystems.signalservice.api.attachment.AttachmentUploadResult import org.whispersystems.signalservice.api.crypto.AttachmentCipherStreamUtil +import org.whispersystems.signalservice.api.messages.AttachmentTransferProgress import org.whispersystems.signalservice.api.messages.SignalServiceAttachment import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentStream import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulResumableUploadResponseCodeException @@ -65,11 +65,6 @@ class AttachmentUploadJob private constructor( val UPLOAD_REUSE_THRESHOLD = 3.days.inWholeMilliseconds - /** - * Foreground notification shows while uploading attachments above this. - */ - private val FOREGROUND_LIMIT = 10.mebiBytes.inWholeBytes - @JvmStatic val maxPlaintextSize: Long get() { @@ -225,7 +220,7 @@ class AttachmentUploadJob private constructor( } private fun getAttachmentNotificationIfNeeded(attachment: Attachment): AttachmentProgressService.Controller? { - return if (attachment.size >= FOREGROUND_LIMIT) { + return if (attachment.size >= AttachmentUploadUtil.FOREGROUND_LIMIT_BYTES) { AttachmentProgressService.start(context, context.getString(R.string.AttachmentUploadJob_uploading_media)) } else { null @@ -264,17 +259,10 @@ class AttachmentUploadJob private constructor( uploadSpec = resumableUploadSpec, cancellationSignal = { isCanceled }, progressListener = object : SignalServiceAttachment.ProgressListener { - private var lastUpdate = 0L - private val updateRate = 500.milliseconds.inWholeMilliseconds - - override fun onAttachmentProgress(total: Long, progress: Long) { - val now = System.currentTimeMillis() - if (now < lastUpdate || lastUpdate + updateRate < now || progress >= total) { - SignalExecutors.BOUNDED_IO.execute { - EventBus.getDefault().postSticky(PartProgressEvent(attachment, PartProgressEvent.Type.NETWORK, total, progress)) - notification?.progress = (progress.toFloat() / total) - } - lastUpdate = now + override fun onAttachmentProgress(progress: AttachmentTransferProgress) { + SignalExecutors.BOUNDED_IO.execute { + EventBus.getDefault().postSticky(PartProgressEvent(attachment, PartProgressEvent.Type.NETWORK, progress)) + notification?.updateProgress(progress.value) } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupMessagesJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupMessagesJob.kt index 7921c21804..0a9de1f196 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupMessagesJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupMessagesJob.kt @@ -23,6 +23,7 @@ import org.thoughtcrime.securesms.jobs.protos.BackupMessagesJobData import org.thoughtcrime.securesms.keyvalue.SignalStore import org.thoughtcrime.securesms.providers.BlobProvider import org.whispersystems.signalservice.api.NetworkResult +import org.whispersystems.signalservice.api.messages.AttachmentTransferProgress import org.whispersystems.signalservice.api.messages.SignalServiceAttachment import org.whispersystems.signalservice.internal.push.AttachmentUploadForm import java.io.File @@ -138,8 +139,8 @@ class BackupMessagesJob private constructor( } val progressListener = object : SignalServiceAttachment.ProgressListener { - override fun onAttachmentProgress(total: Long, progress: Long) { - ArchiveUploadProgress.onMessageBackupUploadProgress(total, progress) + override fun onAttachmentProgress(progress: AttachmentTransferProgress) { + ArchiveUploadProgress.onMessageBackupUploadProgress(progress) } override fun shouldCancel(): Boolean = isCanceled diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupRestoreJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupRestoreJob.kt index d31f9480b9..b01479b065 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupRestoreJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupRestoreJob.kt @@ -6,7 +6,6 @@ package org.thoughtcrime.securesms.jobs import org.greenrobot.eventbus.EventBus -import org.signal.core.util.bytes import org.signal.core.util.logging.Log import org.signal.libsignal.zkgroup.profiles.ProfileKey import org.thoughtcrime.securesms.R @@ -23,6 +22,7 @@ import org.thoughtcrime.securesms.providers.BlobProvider import org.thoughtcrime.securesms.recipients.Recipient import org.thoughtcrime.securesms.service.BackupProgressService import org.whispersystems.signalservice.api.NetworkResult +import org.whispersystems.signalservice.api.messages.AttachmentTransferProgress import org.whispersystems.signalservice.api.messages.SignalServiceAttachment.ProgressListener import java.io.IOException @@ -71,13 +71,13 @@ class BackupRestoreJob private constructor(parameters: Parameters) : BaseJob(par SignalStore.backup.restoreState = RestoreState.RESTORING_DB val progressListener = object : ProgressListener { - override fun onAttachmentProgress(total: Long, progress: Long) { + override fun onAttachmentProgress(progress: AttachmentTransferProgress) { controller.update( title = context.getString(R.string.BackupProgressService_title_downloading), - progress = progress.toFloat() / total.toFloat(), + progress = progress.value, indeterminate = false ) - EventBus.getDefault().post(RestoreV2Event(RestoreV2Event.Type.PROGRESS_DOWNLOAD, progress.bytes, total.bytes)) + EventBus.getDefault().post(RestoreV2Event(RestoreV2Event.Type.PROGRESS_DOWNLOAD, progress.transmitted, progress.total)) } override fun shouldCancel() = isCanceled diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupSubscriptionCheckJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupSubscriptionCheckJob.kt index f397b09f0b..4be4f369ac 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupSubscriptionCheckJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupSubscriptionCheckJob.kt @@ -165,7 +165,7 @@ class BackupSubscriptionCheckJob private constructor(parameters: Parameters) : C */ private fun checkForFailedOrCanceledSubscriptionState(activeSubscription: ActiveSubscription?) { val containsFailedPaymentOrCancellation = activeSubscription?.isFailedPayment == true || activeSubscription?.isCanceled == true - if (containsFailedPaymentOrCancellation && activeSubscription.activeSubscription != null) { + if (containsFailedPaymentOrCancellation && activeSubscription?.activeSubscription != null) { Log.i(TAG, "Subscription either has a payment failure or has been canceled.") val response = SignalNetwork.account.whoAmI() diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/PushSendJob.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/PushSendJob.java index 221842c75e..ab9131783d 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/PushSendJob.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/PushSendJob.java @@ -16,8 +16,6 @@ import androidx.annotation.Nullable; import com.annimon.stream.Stream; import org.greenrobot.eventbus.EventBus; -import org.greenrobot.eventbus.Subscribe; -import org.greenrobot.eventbus.ThreadMode; import org.signal.core.util.Hex; import org.signal.core.util.logging.Log; import org.signal.libsignal.metadata.certificate.InvalidCertificateException; @@ -67,6 +65,7 @@ import org.thoughtcrime.securesms.util.RemoteConfig; import org.thoughtcrime.securesms.util.ImageCompressionUtil; import org.thoughtcrime.securesms.util.MediaUtil; import org.thoughtcrime.securesms.util.Util; +import org.whispersystems.signalservice.api.messages.AttachmentTransferProgress; import org.whispersystems.signalservice.api.messages.SignalServiceAttachment; import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentPointer; import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentRemoteId; @@ -90,7 +89,6 @@ import java.util.Locale; import java.util.Optional; import java.util.Set; import java.util.UUID; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -219,8 +217,8 @@ public abstract class PushSendJob extends SendJob { .withResumableUploadSpec(AppDependencies.getSignalServiceMessageSender().getResumableUploadSpec()) .withListener(new SignalServiceAttachment.ProgressListener() { @Override - public void onAttachmentProgress(long total, long progress) { - EventBus.getDefault().postSticky(new PartProgressEvent(attachment, PartProgressEvent.Type.NETWORK, total, progress)); + public void onAttachmentProgress(@NonNull AttachmentTransferProgress progress) { + EventBus.getDefault().postSticky(new PartProgressEvent(attachment, PartProgressEvent.Type.NETWORK, progress)); } @Override diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/RestoreAttachmentJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/RestoreAttachmentJob.kt index c70ab070cc..8b868e0251 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/RestoreAttachmentJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/RestoreAttachmentJob.kt @@ -38,6 +38,7 @@ import org.thoughtcrime.securesms.notifications.NotificationChannels import org.thoughtcrime.securesms.notifications.NotificationIds import org.thoughtcrime.securesms.transport.RetryLaterException import org.thoughtcrime.securesms.util.RemoteConfig +import org.whispersystems.signalservice.api.messages.AttachmentTransferProgress import org.whispersystems.signalservice.api.messages.SignalServiceAttachment import org.whispersystems.signalservice.api.push.exceptions.MissingConfigurationException import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulResponseCodeException @@ -230,8 +231,8 @@ class RestoreAttachmentJob private constructor( val pointer = attachment.createArchiveAttachmentPointer(useArchiveCdn) val progressListener = object : SignalServiceAttachment.ProgressListener { - override fun onAttachmentProgress(total: Long, progress: Long) { - EventBus.getDefault().postSticky(PartProgressEvent(attachment, PartProgressEvent.Type.NETWORK, total, progress)) + override fun onAttachmentProgress(progress: AttachmentTransferProgress) { + EventBus.getDefault().postSticky(PartProgressEvent(attachment, PartProgressEvent.Type.NETWORK, progress)) } override fun shouldCancel(): Boolean { diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/RestoreAttachmentThumbnailJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/RestoreAttachmentThumbnailJob.kt index 09763eb588..935e0efdbf 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/RestoreAttachmentThumbnailJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/RestoreAttachmentThumbnailJob.kt @@ -20,6 +20,7 @@ import org.thoughtcrime.securesms.jobmanager.JsonJobData import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint import org.thoughtcrime.securesms.keyvalue.SignalStore import org.thoughtcrime.securesms.util.RemoteConfig +import org.whispersystems.signalservice.api.messages.AttachmentTransferProgress import org.whispersystems.signalservice.api.messages.SignalServiceAttachment import org.whispersystems.signalservice.api.push.exceptions.MissingConfigurationException import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulResponseCodeException @@ -122,7 +123,7 @@ class RestoreAttachmentThumbnailJob private constructor( val thumbnailFile: File = SignalDatabase.attachments.createArchiveThumbnailTransferFile() val progressListener = object : SignalServiceAttachment.ProgressListener { - override fun onAttachmentProgress(total: Long, progress: Long) = Unit + override fun onAttachmentProgress(progress: AttachmentTransferProgress) = Unit override fun shouldCancel(): Boolean = this@RestoreAttachmentThumbnailJob.isCanceled } diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt index 82ae35f189..c27a881b61 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt @@ -11,6 +11,7 @@ import org.signal.core.util.isNotNullOrBlank import org.signal.core.util.logging.Log import org.signal.core.util.readLength import org.signal.protos.resumableuploads.ResumableUpload +import org.thoughtcrime.securesms.R import org.thoughtcrime.securesms.attachments.AttachmentId import org.thoughtcrime.securesms.attachments.AttachmentUploadUtil import org.thoughtcrime.securesms.attachments.DatabaseAttachment @@ -24,9 +25,11 @@ import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint import org.thoughtcrime.securesms.jobs.protos.UploadAttachmentToArchiveJobData import org.thoughtcrime.securesms.keyvalue.SignalStore import org.thoughtcrime.securesms.net.SignalNetwork +import org.thoughtcrime.securesms.service.AttachmentProgressService import org.whispersystems.signalservice.api.NetworkResult import org.whispersystems.signalservice.api.archive.ArchiveMediaUploadFormStatusCodes import org.whispersystems.signalservice.api.attachment.AttachmentUploadResult +import org.whispersystems.signalservice.api.messages.AttachmentTransferProgress import org.whispersystems.signalservice.api.messages.SignalServiceAttachment import java.io.FileNotFoundException import java.io.IOException @@ -49,7 +52,7 @@ class UploadAttachmentToArchiveJob private constructor( companion object { private val TAG = Log.tag(UploadAttachmentToArchiveJob::class) const val KEY = "UploadAttachmentToArchiveJob" - const val MAX_JOB_QUEUES = 2 + private const val MAX_JOB_QUEUES = 2 /** * This randomly selects between one of [MAX_JOB_QUEUES] queues. It's a fun way of limiting the concurrency of the upload jobs to @@ -151,6 +154,12 @@ class UploadAttachmentToArchiveJob private constructor( Log.d(TAG, "[$attachmentId] Already have an upload spec. Continuing...") } + val progressServiceController = if (attachment.size >= AttachmentUploadUtil.FOREGROUND_LIMIT_BYTES) { + AttachmentProgressService.start(context, context.getString(R.string.AttachmentUploadJob_uploading_media)) + } else { + null + } + val attachmentStream = try { AttachmentUploadUtil.buildSignalServiceAttachmentStream( context = context, @@ -158,7 +167,11 @@ class UploadAttachmentToArchiveJob private constructor( uploadSpec = uploadSpec!!, cancellationSignal = { this.isCanceled }, progressListener = object : SignalServiceAttachment.ProgressListener { - override fun onAttachmentProgress(total: Long, progress: Long) = ArchiveUploadProgress.onAttachmentProgress(attachmentId, progress) + override fun onAttachmentProgress(progress: AttachmentTransferProgress) { + ArchiveUploadProgress.onAttachmentProgress(attachmentId, progress.transmitted.inWholeBytes) + progressServiceController?.updateProgress(progress.value) + } + override fun shouldCancel() = this@UploadAttachmentToArchiveJob.isCanceled } ) @@ -172,34 +185,37 @@ class UploadAttachmentToArchiveJob private constructor( } Log.d(TAG, "[$attachmentId] Beginning upload...") - val uploadResult: AttachmentUploadResult = when (val result = SignalNetwork.attachments.uploadAttachmentV4(attachmentStream)) { - is NetworkResult.Success -> result.result - is NetworkResult.ApplicationError -> throw result.throwable - is NetworkResult.NetworkError -> { - Log.w(TAG, "[$attachmentId] Failed to upload due to network error.", result.exception) + progressServiceController.use { + val uploadResult: AttachmentUploadResult = when (val result = SignalNetwork.attachments.uploadAttachmentV4(attachmentStream)) { + is NetworkResult.Success -> result.result + is NetworkResult.ApplicationError -> throw result.throwable + is NetworkResult.NetworkError -> { + Log.w(TAG, "[$attachmentId] Failed to upload due to network error.", result.exception) - if (result.exception.cause is ProtocolException) { - Log.w(TAG, "[$attachmentId] Length may be incorrect. Recalculating.", result.exception) + if (result.exception.cause is ProtocolException) { + Log.w(TAG, "[$attachmentId] Length may be incorrect. Recalculating.", result.exception) - val actualLength = SignalDatabase.attachments.getAttachmentStream(attachmentId, 0).readLength() - if (actualLength != attachment.size) { - Log.w(TAG, "[$attachmentId] Length was incorrect! Will update. Previous: ${attachment.size}, Newly-Calculated: $actualLength", result.exception) - SignalDatabase.attachments.updateAttachmentLength(attachmentId, actualLength) - } else { - Log.i(TAG, "[$attachmentId] Length was correct. No action needed. Will retry.") + val actualLength = SignalDatabase.attachments.getAttachmentStream(attachmentId, 0).readLength() + if (actualLength != attachment.size) { + Log.w(TAG, "[$attachmentId] Length was incorrect! Will update. Previous: ${attachment.size}, Newly-Calculated: $actualLength", result.exception) + SignalDatabase.attachments.updateAttachmentLength(attachmentId, actualLength) + } else { + Log.i(TAG, "[$attachmentId] Length was correct. No action needed. Will retry.") + } } + + return Result.retry(defaultBackoff()) } - return Result.retry(defaultBackoff()) + is NetworkResult.StatusCodeError -> { + Log.w(TAG, "[$attachmentId] Failed to upload due to status code error. Code: ${result.code}", result.exception) + return Result.retry(defaultBackoff()) + } } - is NetworkResult.StatusCodeError -> { - Log.w(TAG, "[$attachmentId] Failed to upload due to status code error. Code: ${result.code}", result.exception) - return Result.retry(defaultBackoff()) - } - } - Log.d(TAG, "[$attachmentId] Upload complete!") - SignalDatabase.attachments.finalizeAttachmentAfterUpload(attachment.attachmentId, uploadResult) + Log.d(TAG, "[$attachmentId] Upload complete!") + SignalDatabase.attachments.finalizeAttachmentAfterUpload(attachment.attachmentId, uploadResult) + } if (!isCanceled) { AppDependencies.jobManager.add(CopyAttachmentToArchiveJob(attachment.attachmentId)) diff --git a/app/src/main/java/org/thoughtcrime/securesms/service/AttachmentProgressService.kt b/app/src/main/java/org/thoughtcrime/securesms/service/AttachmentProgressService.kt index 0ea40e1e68..944c27db57 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/service/AttachmentProgressService.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/service/AttachmentProgressService.kt @@ -10,8 +10,15 @@ import android.app.PendingIntent import android.content.Context import android.content.Intent import androidx.core.app.NotificationCompat +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.cancel +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.collectLatest +import kotlinx.coroutines.launch import org.signal.core.util.PendingIntentFlags import org.signal.core.util.logging.Log +import org.signal.core.util.throttleLatest import org.thoughtcrime.securesms.MainActivity import org.thoughtcrime.securesms.R import org.thoughtcrime.securesms.notifications.NotificationChannels @@ -19,6 +26,7 @@ import org.thoughtcrime.securesms.notifications.NotificationIds import java.util.concurrent.CopyOnWriteArraySet import java.util.concurrent.locks.ReentrantLock import kotlin.concurrent.withLock +import kotlin.time.Duration.Companion.milliseconds /** * A service to show attachment progress. In order to ensure we only show one status notification, @@ -60,7 +68,7 @@ class AttachmentProgressService : SafeForegroundService() { controllerLock.withLock { val started = if (controllers.isEmpty()) { Log.i(TAG, "[start] First controller. Starting.") - SafeForegroundService.start(context, AttachmentProgressService::class.java) + start(context, AttachmentProgressService::class.java) } else { Log.i(TAG, "[start] No need to start the service again. Already have an active controller.") true @@ -78,7 +86,7 @@ class AttachmentProgressService : SafeForegroundService() { } private fun stop(context: Context) { - SafeForegroundService.stop(context, AttachmentProgressService::class.java) + stop(context, AttachmentProgressService::class.java) } private fun onControllersChanged(context: Context) { @@ -132,6 +140,17 @@ class AttachmentProgressService : SafeForegroundService() { } class Controller(private val context: Context, title: String) : AutoCloseable { + private val coroutineScope = CoroutineScope(Dispatchers.IO) + private val progressFlow = MutableSharedFlow(replay = 0, extraBufferCapacity = 1) + + init { + coroutineScope.launch { + progressFlow + .throttleLatest(500.milliseconds) // avoid OS notification rate limiting + .collectLatest { progress = it } + } + } + var title: String = title set(value) { field = value @@ -139,7 +158,7 @@ class AttachmentProgressService : SafeForegroundService() { } var progress: Float = 0f - set(value) { + private set(value) { field = value indeterminate = false onControllersChanged(context) @@ -155,8 +174,13 @@ class AttachmentProgressService : SafeForegroundService() { onControllersChanged(context) } + fun updateProgress(progress: Float) { + progressFlow.tryEmit(progress) + } + override fun close() { controllerLock.withLock { + coroutineScope.cancel() controllers.remove(this) onControllersChanged(context) } diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/messages/SignalServiceAttachment.kt b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/messages/SignalServiceAttachment.kt index ce549de2dc..51c25e61cf 100644 --- a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/messages/SignalServiceAttachment.kt +++ b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/messages/SignalServiceAttachment.kt @@ -4,6 +4,8 @@ */ package org.whispersystems.signalservice.api.messages +import org.signal.core.util.ByteSize +import org.signal.core.util.bytes import org.whispersystems.signalservice.internal.push.http.CancelationSignal import org.whispersystems.signalservice.internal.push.http.ResumableUploadSpec import java.io.InputStream @@ -160,11 +162,8 @@ abstract class SignalServiceAttachment protected constructor(val contentType: St interface ProgressListener { /** * Called on a progress change event. - * - * @param total The total amount to transmit/receive in bytes. - * @param progress The amount that has been transmitted/received in bytes thus far */ - fun onAttachmentProgress(total: Long, progress: Long) + fun onAttachmentProgress(progress: AttachmentTransferProgress) fun shouldCancel(): Boolean } @@ -176,3 +175,21 @@ abstract class SignalServiceAttachment protected constructor(val contentType: St } } } + +/** + * Progress status for an attachment upload/download operation. + */ +data class AttachmentTransferProgress( + /** The total amount of bytes to transmit/receive. */ + val total: ByteSize, + + /** The amount of bytes that have been transmitted/received thus far. */ + val transmitted: ByteSize +) { + constructor(total: Long, transmitted: Long) : this(total.bytes, transmitted.bytes) + + /** + * The fractional progress as a float value between 0.0 and 1.0 (inclusive). + */ + val value = transmitted.inWholeBytes.toFloat() / total.inWholeBytes +} diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/push/PushServiceSocket.java b/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/push/PushServiceSocket.java index 4b13e5d808..3c381a9989 100644 --- a/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/push/PushServiceSocket.java +++ b/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/push/PushServiceSocket.java @@ -28,6 +28,7 @@ import org.whispersystems.signalservice.api.account.AccountAttributes; import org.whispersystems.signalservice.api.account.PreKeyCollection; import org.whispersystems.signalservice.api.crypto.SealedSenderAccess; import org.whispersystems.signalservice.api.groupsv2.GroupsV2AuthorizationString; +import org.whispersystems.signalservice.api.messages.AttachmentTransferProgress; import org.whispersystems.signalservice.api.messages.SignalServiceAttachment.ProgressListener; import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentRemoteId; import org.whispersystems.signalservice.api.messages.calls.CallingResponse; @@ -653,7 +654,7 @@ public class PushServiceSocket { if ((totalRead += read) > maxSizeBytes) throw new PushNetworkException("Response exceeded max size!"); if (listener != null) { - listener.onAttachmentProgress(body.contentLength() + offset, totalRead); + listener.onAttachmentProgress(new AttachmentTransferProgress(body.contentLength() + offset, totalRead)); if (listener.shouldCancel()) { call.cancel(); throw new PushNetworkException("Canceled by listener check."); diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/push/http/DigestingRequestBody.kt b/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/push/http/DigestingRequestBody.kt index 99c957e85c..fe4930f5de 100644 --- a/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/push/http/DigestingRequestBody.kt +++ b/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/push/http/DigestingRequestBody.kt @@ -8,6 +8,7 @@ import org.signal.libsignal.protocol.incrementalmac.ChunkSizeChoice import org.signal.libsignal.protocol.logging.Log import org.whispersystems.signalservice.api.crypto.DigestingOutputStream import org.whispersystems.signalservice.api.crypto.SkippingOutputStream +import org.whispersystems.signalservice.api.messages.AttachmentTransferProgress import org.whispersystems.signalservice.api.messages.SignalServiceAttachment import org.whispersystems.signalservice.internal.crypto.AttachmentDigest import java.io.ByteArrayOutputStream @@ -58,7 +59,7 @@ class DigestingRequestBody( throw IOException("Canceled!") } outputStream.write(buffer, 0, read) - progressListener?.onAttachmentProgress(contentLength, outputStream.totalBytesWritten) + progressListener?.onAttachmentProgress(AttachmentTransferProgress(total = contentLength, transmitted = outputStream.totalBytesWritten)) } outputStream.flush() diff --git a/libsignal-service/src/test/java/org/whispersystems/signalservice/internal/push/http/DigestingRequestBodyTest.kt b/libsignal-service/src/test/java/org/whispersystems/signalservice/internal/push/http/DigestingRequestBodyTest.kt index e030b1c154..fe115f265a 100644 --- a/libsignal-service/src/test/java/org/whispersystems/signalservice/internal/push/http/DigestingRequestBodyTest.kt +++ b/libsignal-service/src/test/java/org/whispersystems/signalservice/internal/push/http/DigestingRequestBodyTest.kt @@ -11,6 +11,7 @@ import org.junit.Assert.assertEquals import org.junit.Assert.assertNotNull import org.junit.Test import org.whispersystems.signalservice.api.crypto.AttachmentCipherStreamUtil +import org.whispersystems.signalservice.api.messages.AttachmentTransferProgress import org.whispersystems.signalservice.api.messages.SignalServiceAttachment import org.whispersystems.signalservice.internal.util.Util import java.io.ByteArrayInputStream @@ -80,7 +81,7 @@ class DigestingRequestBodyTest { contentLength = CONTENT_LENGTH.toLong(), incremental = false, progressListener = object : SignalServiceAttachment.ProgressListener { - override fun onAttachmentProgress(total: Long, progress: Long) { + override fun onAttachmentProgress(progress: AttachmentTransferProgress) { // no-op }