diff --git a/app/src/main/java/org/thoughtcrime/securesms/backup/ArchiveUploadProgress.kt b/app/src/main/java/org/thoughtcrime/securesms/backup/ArchiveUploadProgress.kt index cee5566ee2..344f2e7c0f 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/backup/ArchiveUploadProgress.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/backup/ArchiveUploadProgress.kt @@ -5,21 +5,20 @@ package org.thoughtcrime.securesms.backup -import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.SharingStarted import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.flow.shareIn +import org.signal.core.util.logging.Log import org.signal.core.util.throttleLatest +import org.thoughtcrime.securesms.attachments.AttachmentId import org.thoughtcrime.securesms.backup.v2.BackupRepository import org.thoughtcrime.securesms.database.SignalDatabase import org.thoughtcrime.securesms.keyvalue.SignalStore import org.thoughtcrime.securesms.keyvalue.protos.ArchiveUploadProgressState -import org.whispersystems.signalservice.api.messages.SignalServiceAttachment +import java.util.concurrent.ConcurrentHashMap import kotlin.math.max import kotlin.time.Duration.Companion.milliseconds @@ -28,6 +27,8 @@ import kotlin.time.Duration.Companion.milliseconds */ object ArchiveUploadProgress { + private val TAG = Log.tag(ArchiveUploadProgress::class) + private val PROGRESS_NONE = ArchiveUploadProgressState( state = ArchiveUploadProgressState.State.None ) @@ -36,18 +37,30 @@ object ArchiveUploadProgress { private var uploadProgress: ArchiveUploadProgressState = SignalStore.backup.archiveUploadState ?: PROGRESS_NONE + private val partialMediaProgress: MutableMap = ConcurrentHashMap() + /** * Observe this to get updates on the current upload progress. */ val progress: Flow = _progress - .throttleLatest(500.milliseconds) + .throttleLatest(500.milliseconds) { + uploadProgress.state == ArchiveUploadProgressState.State.None || + (uploadProgress.state == ArchiveUploadProgressState.State.UploadBackupFile && uploadProgress.backupFileUploadedBytes == 0L) || + (uploadProgress.state == ArchiveUploadProgressState.State.UploadMedia && uploadProgress.mediaUploadedBytes == 0L) + } .map { - if (uploadProgress.state != ArchiveUploadProgressState.State.UploadingAttachments) { + if (uploadProgress.state != ArchiveUploadProgressState.State.UploadMedia) { return@map uploadProgress } - val pendingCount = SignalDatabase.attachments.getPendingArchiveUploadCount() - if (pendingCount == uploadProgress.totalAttachments) { + if (!SignalStore.backup.backsUpMedia) { + Log.i(TAG, "Doesn't upload media. Done!") + return@map PROGRESS_NONE + } + + val pendingMediaUploadBytes = SignalDatabase.attachments.getPendingArchiveUploadBytes() - partialMediaProgress.values.sum() + if (pendingMediaUploadBytes <= 0) { + Log.i(TAG, "No more pending bytes. Done!") return@map PROGRESS_NONE } @@ -55,90 +68,96 @@ object ArchiveUploadProgress { // If we wanted the most accurate progress possible, we could maintain a new database flag that indicates whether an attachment has been flagged as part // of the current upload batch. However, this gets us pretty close while keeping things simple and not having to juggle extra flags, with the caveat that // the progress bar may occasionally be including media that is not actually referenced in the active backup file. - val totalCount = max(uploadProgress.totalAttachments, pendingCount) + val totalMediaUploadBytes = max(uploadProgress.mediaTotalBytes, pendingMediaUploadBytes) ArchiveUploadProgressState( - state = ArchiveUploadProgressState.State.UploadingAttachments, - completedAttachments = totalCount - pendingCount, - totalAttachments = totalCount + state = ArchiveUploadProgressState.State.UploadMedia, + mediaUploadedBytes = totalMediaUploadBytes - pendingMediaUploadBytes, + mediaTotalBytes = totalMediaUploadBytes ) } - .onEach { - updateState(it, notify = false) + .onEach { updated -> + updateState(notify = false) { updated } } .flowOn(Dispatchers.IO) - .shareIn(scope = CoroutineScope(Dispatchers.IO), started = SharingStarted.WhileSubscribed(), replay = 1) val inProgress get() = uploadProgress.state != ArchiveUploadProgressState.State.None fun begin() { - updateState( + updateState { ArchiveUploadProgressState( - state = ArchiveUploadProgressState.State.BackingUpMessages + state = ArchiveUploadProgressState.State.Export ) - ) + } } - fun onMessageBackupCreated() { - updateState( - ArchiveUploadProgressState( - state = ArchiveUploadProgressState.State.UploadingMessages + fun onMessageBackupCreated(backupFileSize: Long) { + updateState { + it.copy( + state = ArchiveUploadProgressState.State.UploadBackupFile, + backupFileTotalBytes = backupFileSize, + backupFileUploadedBytes = 0 ) - ) + } } - fun onAttachmentsStarted(attachmentCount: Long) { - updateState( - ArchiveUploadProgressState( - state = ArchiveUploadProgressState.State.UploadingAttachments, - completedAttachments = 0, - totalAttachments = attachmentCount + fun onMessageBackupUploadProgress(totalBytes: Long, bytesUploaded: Long) { + updateState { + it.copy( + state = ArchiveUploadProgressState.State.UploadBackupFile, + backupFileUploadedBytes = bytesUploaded, + backupFileTotalBytes = totalBytes ) - ) + } } - fun onAttachmentFinished() { + fun onAttachmentsStarted(totalAttachmentBytes: Long) { + updateState { + it.copy( + state = ArchiveUploadProgressState.State.UploadMedia, + mediaUploadedBytes = 0, + mediaTotalBytes = totalAttachmentBytes + ) + } + } + + fun onAttachmentProgress(attachmentId: AttachmentId, bytesUploaded: Long) { + partialMediaProgress[attachmentId] = bytesUploaded + _progress.tryEmit(Unit) + } + + fun onAttachmentFinished(attachmentId: AttachmentId) { + partialMediaProgress.remove(attachmentId) _progress.tryEmit(Unit) } fun onMessageBackupFinishedEarly() { - updateState(PROGRESS_NONE) + updateState { PROGRESS_NONE } } fun onValidationFailure() { - updateState(PROGRESS_NONE) + updateState { PROGRESS_NONE } } fun onMainBackupFileUploadFailure() { - updateState(PROGRESS_NONE) + updateState { PROGRESS_NONE } } - private fun updateState(state: ArchiveUploadProgressState, notify: Boolean = true) { - uploadProgress = state - SignalStore.backup.archiveUploadState = state + private fun updateState(notify: Boolean = true, transform: (ArchiveUploadProgressState) -> ArchiveUploadProgressState) { + val newState = transform(uploadProgress) + if (uploadProgress == newState) { + return + } + + uploadProgress = newState + SignalStore.backup.archiveUploadState = newState if (notify) { _progress.tryEmit(Unit) } } - class ArchiveUploadProgressListener( - private val shouldCancel: () -> Boolean = { false } - ) : SignalServiceAttachment.ProgressListener { - override fun onAttachmentProgress(total: Long, progress: Long) { - updateState( - state = ArchiveUploadProgressState( - state = ArchiveUploadProgressState.State.UploadingMessages, - totalAttachments = total, - completedAttachments = progress - ) - ) - } - - override fun shouldCancel(): Boolean = shouldCancel() - } - object ArchiveBackupProgressListener : BackupRepository.ExportProgressListener { override fun onAccount() { updatePhase(ArchiveUploadProgressState.BackupPhase.Account) @@ -178,17 +197,17 @@ object ArchiveUploadProgress { private fun updatePhase( phase: ArchiveUploadProgressState.BackupPhase, - completedObjects: Long = 0L, - totalObjects: Long = 0L + exportedFrames: Long = 0L, + totalFrames: Long = 0L ) { - updateState( - state = ArchiveUploadProgressState( - state = ArchiveUploadProgressState.State.BackingUpMessages, + updateState { + ArchiveUploadProgressState( + state = ArchiveUploadProgressState.State.Export, backupPhase = phase, - completedAttachments = completedObjects, - totalAttachments = totalObjects + frameExportCount = exportedFrames, + frameTotalCount = totalFrames ) - ) + } } } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/components/settings/app/backups/remote/RemoteBackupsSettingsFragment.kt b/app/src/main/java/org/thoughtcrime/securesms/components/settings/app/backups/remote/RemoteBackupsSettingsFragment.kt index 2056337f21..a7bf14d2bb 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/components/settings/app/backups/remote/RemoteBackupsSettingsFragment.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/components/settings/app/backups/remote/RemoteBackupsSettingsFragment.kt @@ -11,7 +11,8 @@ import android.widget.Toast import androidx.activity.result.ActivityResultLauncher import androidx.biometric.BiometricManager import androidx.biometric.BiometricPrompt -import androidx.compose.animation.AnimatedContent +import androidx.compose.animation.core.animateFloatAsState +import androidx.compose.animation.core.tween import androidx.compose.foundation.Image import androidx.compose.foundation.background import androidx.compose.foundation.border @@ -64,6 +65,7 @@ import androidx.compose.ui.text.buildAnnotatedString import androidx.compose.ui.text.font.FontWeight import androidx.compose.ui.unit.dp import androidx.compose.ui.window.DialogProperties +import androidx.lifecycle.compose.collectAsStateWithLifecycle import androidx.navigation.fragment.findNavController import androidx.navigation.fragment.navArgs import org.signal.core.ui.compose.Buttons @@ -108,7 +110,6 @@ import org.thoughtcrime.securesms.util.viewModel import java.math.BigDecimal import java.util.Currency import java.util.Locale -import kotlin.math.max import kotlin.time.Duration import kotlin.time.Duration.Companion.days import kotlin.time.Duration.Companion.milliseconds @@ -137,7 +138,7 @@ class RemoteBackupsSettingsFragment : ComposeFragment() { @Composable override fun FragmentContent() { val state by viewModel.state.collectAsState() - val backupProgress by ArchiveUploadProgress.progress.collectAsState(initial = null) + val backupProgress by ArchiveUploadProgress.progress.collectAsStateWithLifecycle(initialValue = null) val restoreState by viewModel.restoreState.collectAsState() val callbacks = remember { Callbacks() } @@ -378,36 +379,34 @@ private fun RemoteBackupsSettingsContent( } item { - AnimatedContent(backupState, label = "backup-state-block") { state -> - when (state) { - is RemoteBackupsSettingsState.BackupState.Loading -> { - LoadingCard() - } + when (backupState) { + is RemoteBackupsSettingsState.BackupState.Loading -> { + LoadingCard() + } - is RemoteBackupsSettingsState.BackupState.Error -> { - ErrorCard() - } + is RemoteBackupsSettingsState.BackupState.Error -> { + ErrorCard() + } - is RemoteBackupsSettingsState.BackupState.Pending -> { - PendingCard(state.price) - } + is RemoteBackupsSettingsState.BackupState.Pending -> { + PendingCard(backupState.price) + } - is RemoteBackupsSettingsState.BackupState.SubscriptionMismatchMissingGooglePlay -> { - SubscriptionMismatchMissingGooglePlayCard( - state = state, - onLearnMoreClick = contentCallbacks::onLearnMoreAboutLostSubscription, - onRenewClick = contentCallbacks::onRenewLostSubscription - ) - } + is RemoteBackupsSettingsState.BackupState.SubscriptionMismatchMissingGooglePlay -> { + SubscriptionMismatchMissingGooglePlayCard( + state = backupState, + onLearnMoreClick = contentCallbacks::onLearnMoreAboutLostSubscription, + onRenewClick = contentCallbacks::onRenewLostSubscription + ) + } - RemoteBackupsSettingsState.BackupState.None -> Unit + RemoteBackupsSettingsState.BackupState.None -> Unit - is RemoteBackupsSettingsState.BackupState.WithTypeAndRenewalTime -> { - BackupCard( - backupState = state, - onBackupTypeActionButtonClicked = contentCallbacks::onBackupTypeActionClick - ) - } + is RemoteBackupsSettingsState.BackupState.WithTypeAndRenewalTime -> { + BackupCard( + backupState = backupState, + onBackupTypeActionButtonClicked = contentCallbacks::onBackupTypeActionClick + ) } } } @@ -993,14 +992,26 @@ private fun InProgressBackupRow( Column( modifier = Modifier.weight(1f) ) { - val backupProgress = getBackupProgress(archiveUploadProgressState) - if (backupProgress.total == 0L) { - LinearProgressIndicator(modifier = Modifier.fillMaxWidth()) - } else { - LinearProgressIndicator( - modifier = Modifier.fillMaxWidth(), - progress = { backupProgress.progress } - ) + when (archiveUploadProgressState.state) { + ArchiveUploadProgressState.State.None -> { + LinearProgressIndicator(modifier = Modifier.fillMaxWidth()) + } + ArchiveUploadProgressState.State.Export -> { + val progressValue by animateFloatAsState(targetValue = archiveUploadProgressState.frameExportProgress(), animationSpec = tween(durationMillis = 250)) + LinearProgressIndicator( + modifier = Modifier.fillMaxWidth(), + progress = { progressValue }, + drawStopIndicator = {} + ) + } + ArchiveUploadProgressState.State.UploadBackupFile, ArchiveUploadProgressState.State.UploadMedia -> { + val progressValue by animateFloatAsState(targetValue = archiveUploadProgressState.uploadProgress(), animationSpec = tween(durationMillis = 250)) + LinearProgressIndicator( + modifier = Modifier.fillMaxWidth(), + progress = { progressValue }, + drawStopIndicator = {} + ) + } } Text( @@ -1012,33 +1023,26 @@ private fun InProgressBackupRow( } } -private fun getBackupProgress(state: ArchiveUploadProgressState): BackupProgress { - val approximateMessageCount = max(state.completedAttachments, state.totalAttachments) - return BackupProgress(state.completedAttachments, approximateMessageCount) -} - @Composable private fun getProgressStateMessage(archiveUploadProgressState: ArchiveUploadProgressState): String { return when (archiveUploadProgressState.state) { ArchiveUploadProgressState.State.None -> stringResource(R.string.RemoteBackupsSettingsFragment__processing_backup) - ArchiveUploadProgressState.State.BackingUpMessages -> getBackupPhaseMessage(archiveUploadProgressState) - ArchiveUploadProgressState.State.UploadingMessages -> getUploadingMessages(archiveUploadProgressState) - ArchiveUploadProgressState.State.UploadingAttachments -> getUploadingAttachmentsMessage(archiveUploadProgressState) + ArchiveUploadProgressState.State.Export -> getBackupExportPhaseProgressString(archiveUploadProgressState) + ArchiveUploadProgressState.State.UploadBackupFile, ArchiveUploadProgressState.State.UploadMedia -> getBackupUploadPhaseProgressString(archiveUploadProgressState) } } @Composable -private fun getBackupPhaseMessage(state: ArchiveUploadProgressState): String { +private fun getBackupExportPhaseProgressString(state: ArchiveUploadProgressState): String { return when (state.backupPhase) { ArchiveUploadProgressState.BackupPhase.BackupPhaseNone -> stringResource(R.string.RemoteBackupsSettingsFragment__processing_backup) ArchiveUploadProgressState.BackupPhase.Message -> { - val progress = getBackupProgress(state) pluralStringResource( - R.plurals.RemoteBackupsSettingsFragment__processing_d_of_d_d_messages, - progress.total.toInt(), - "%,d".format(progress.completed.toInt()), - "%,d".format(progress.total.toInt()), - (progress.progress * 100).toInt() + R.plurals.RemoteBackupsSettingsFragment__processing_messages_progress_text, + state.frameTotalCount.toInt(), + "%,d".format(state.frameExportCount), + "%,d".format(state.frameTotalCount), + (state.frameExportProgress() * 100).toInt() ) } @@ -1047,25 +1051,12 @@ private fun getBackupPhaseMessage(state: ArchiveUploadProgressState): String { } @Composable -private fun getUploadingMessages(state: ArchiveUploadProgressState): String { - val formattedCompleted = state.completedAttachments.bytes.toUnitString() - val formattedTotal = state.totalAttachments.bytes.toUnitString() - val percent = if (state.totalAttachments == 0L) { - 0 - } else { - ((state.completedAttachments / state.totalAttachments.toFloat()) * 100).toInt() - } +private fun getBackupUploadPhaseProgressString(state: ArchiveUploadProgressState): String { + val formattedTotalBytes = state.uploadBytesTotal.bytes.toUnitString() + val formattedUploadedBytes = state.uploadBytesUploaded.bytes.toUnitString() + val percent = (state.uploadProgress() * 100).toInt() - return stringResource(R.string.RemoteBackupsSettingsFragment__uploading_s_of_s_d, formattedCompleted, formattedTotal, percent) -} - -@Composable -private fun getUploadingAttachmentsMessage(state: ArchiveUploadProgressState): String { - return if (state.totalAttachments == 0L) { - stringResource(R.string.RemoteBackupsSettingsFragment__processing_backup) - } else { - stringResource(R.string.RemoteBackupsSettingsFragment__d_slash_d, state.completedAttachments, state.totalAttachments) - } + return stringResource(R.string.RemoteBackupsSettingsFragment__uploading_s_of_s_d, formattedUploadedBytes, formattedTotalBytes, percent) } @Composable @@ -1475,70 +1466,82 @@ private fun InProgressRowPreview() { InProgressBackupRow(archiveUploadProgressState = ArchiveUploadProgressState()) InProgressBackupRow( archiveUploadProgressState = ArchiveUploadProgressState( - state = ArchiveUploadProgressState.State.BackingUpMessages, + state = ArchiveUploadProgressState.State.Export, backupPhase = ArchiveUploadProgressState.BackupPhase.BackupPhaseNone ) ) InProgressBackupRow( archiveUploadProgressState = ArchiveUploadProgressState( - state = ArchiveUploadProgressState.State.BackingUpMessages, + state = ArchiveUploadProgressState.State.Export, backupPhase = ArchiveUploadProgressState.BackupPhase.Account ) ) InProgressBackupRow( archiveUploadProgressState = ArchiveUploadProgressState( - state = ArchiveUploadProgressState.State.BackingUpMessages, + state = ArchiveUploadProgressState.State.Export, backupPhase = ArchiveUploadProgressState.BackupPhase.Call ) ) InProgressBackupRow( archiveUploadProgressState = ArchiveUploadProgressState( - state = ArchiveUploadProgressState.State.BackingUpMessages, + state = ArchiveUploadProgressState.State.Export, backupPhase = ArchiveUploadProgressState.BackupPhase.Sticker ) ) InProgressBackupRow( archiveUploadProgressState = ArchiveUploadProgressState( - state = ArchiveUploadProgressState.State.BackingUpMessages, + state = ArchiveUploadProgressState.State.Export, backupPhase = ArchiveUploadProgressState.BackupPhase.Recipient ) ) InProgressBackupRow( archiveUploadProgressState = ArchiveUploadProgressState( - state = ArchiveUploadProgressState.State.BackingUpMessages, + state = ArchiveUploadProgressState.State.Export, backupPhase = ArchiveUploadProgressState.BackupPhase.Thread ) ) InProgressBackupRow( archiveUploadProgressState = ArchiveUploadProgressState( - state = ArchiveUploadProgressState.State.BackingUpMessages, + state = ArchiveUploadProgressState.State.Export, backupPhase = ArchiveUploadProgressState.BackupPhase.Message, - completedAttachments = 1, - totalAttachments = 1 + frameExportCount = 1, + frameTotalCount = 1 ) ) InProgressBackupRow( archiveUploadProgressState = ArchiveUploadProgressState( - state = ArchiveUploadProgressState.State.BackingUpMessages, + state = ArchiveUploadProgressState.State.Export, backupPhase = ArchiveUploadProgressState.BackupPhase.Message, - completedAttachments = 1000, - totalAttachments = 100_000 + frameExportCount = 1000, + frameTotalCount = 100_000 ) ) InProgressBackupRow( archiveUploadProgressState = ArchiveUploadProgressState( - state = ArchiveUploadProgressState.State.BackingUpMessages, + state = ArchiveUploadProgressState.State.Export, backupPhase = ArchiveUploadProgressState.BackupPhase.Message, - completedAttachments = 1_000_000, - totalAttachments = 100_000 + frameExportCount = 1_000_000, + frameTotalCount = 100_000 ) ) InProgressBackupRow( archiveUploadProgressState = ArchiveUploadProgressState( - state = ArchiveUploadProgressState.State.UploadingMessages, + state = ArchiveUploadProgressState.State.UploadBackupFile, backupPhase = ArchiveUploadProgressState.BackupPhase.BackupPhaseNone, - completedAttachments = 1.gibiBytes.inWholeBytes + 100.mebiBytes.inWholeBytes, - totalAttachments = 12.gibiBytes.inWholeBytes + backupFileUploadedBytes = 10.mebiBytes.inWholeBytes, + backupFileTotalBytes = 50.mebiBytes.inWholeBytes, + mediaUploadedBytes = 0, + mediaTotalBytes = 0 + ) + ) + InProgressBackupRow( + archiveUploadProgressState = ArchiveUploadProgressState( + state = ArchiveUploadProgressState.State.UploadMedia, + backupPhase = ArchiveUploadProgressState.BackupPhase.BackupPhaseNone, + backupFileUploadedBytes = 10.mebiBytes.inWholeBytes, + backupFileTotalBytes = 50.mebiBytes.inWholeBytes, + mediaUploadedBytes = 100.mebiBytes.inWholeBytes, + mediaTotalBytes = 1.gibiBytes.inWholeBytes ) ) } @@ -1614,3 +1617,28 @@ private data class BackupProgress( ) { val progress: Float = if (total > 0) completed / total.toFloat() else 0f } + +private fun ArchiveUploadProgressState.frameExportProgress(): Float { + return if (this.frameTotalCount == 0L) { + 0f + } else { + this.frameExportCount / this.frameTotalCount.toFloat() + } +} + +private fun ArchiveUploadProgressState.uploadProgress(): Float { + val current = this.backupFileUploadedBytes + this.mediaUploadedBytes + val total = this.backupFileTotalBytes + this.mediaTotalBytes + + return if (total == 0L) { + 0f + } else { + current / total.toFloat() + } +} + +private val ArchiveUploadProgressState.uploadBytesTotal: Long + get() = this.backupFileTotalBytes + this.mediaTotalBytes + +private val ArchiveUploadProgressState.uploadBytesUploaded: Long + get() = this.backupFileUploadedBytes + this.mediaUploadedBytes diff --git a/app/src/main/java/org/thoughtcrime/securesms/components/settings/app/backups/remote/RemoteBackupsSettingsViewModel.kt b/app/src/main/java/org/thoughtcrime/securesms/components/settings/app/backups/remote/RemoteBackupsSettingsViewModel.kt index f980520022..576ba81b6a 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/components/settings/app/backups/remote/RemoteBackupsSettingsViewModel.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/components/settings/app/backups/remote/RemoteBackupsSettingsViewModel.kt @@ -25,6 +25,7 @@ import org.signal.core.util.bytes import org.signal.core.util.logging.Log import org.signal.core.util.money.FiatMoney import org.signal.donations.InAppPaymentType +import org.thoughtcrime.securesms.backup.ArchiveUploadProgress import org.thoughtcrime.securesms.backup.v2.BackupFrequency import org.thoughtcrime.securesms.backup.v2.BackupRepository import org.thoughtcrime.securesms.backup.v2.MessageBackupTier @@ -41,6 +42,7 @@ import org.thoughtcrime.securesms.dependencies.AppDependencies import org.thoughtcrime.securesms.jobs.BackupMessagesJob import org.thoughtcrime.securesms.jobs.RestoreOptimizedMediaJob import org.thoughtcrime.securesms.keyvalue.SignalStore +import org.thoughtcrime.securesms.keyvalue.protos.ArchiveUploadProgressState import org.thoughtcrime.securesms.service.MessageBackupListener import java.util.Currency import kotlin.time.Duration.Companion.seconds @@ -103,6 +105,20 @@ class RemoteBackupsSettingsViewModel : ViewModel() { delay(1.seconds) } } + + viewModelScope.launch { + var previous: ArchiveUploadProgressState.State? = null + ArchiveUploadProgress.progress + .collect { current -> + if (previous != null && current.state == ArchiveUploadProgressState.State.None) { + _state.update { + it.copy(lastBackupTimestamp = SignalStore.backup.lastBackupTime) + } + refreshState(null) + } + previous = current.state + } + } } fun setCanBackUpUsingCellular(canBackUpUsingCellular: Boolean) { @@ -154,6 +170,42 @@ class RemoteBackupsSettingsViewModel : ViewModel() { } } + fun turnOffAndDeleteBackups() { + viewModelScope.launch { + Log.d(TAG, "Beginning to turn off and delete backup.") + requestDialog(RemoteBackupsSettingsState.Dialog.PROGRESS_SPINNER) + + val hasMediaBackupUploaded = SignalStore.backup.backsUpMedia && SignalStore.backup.hasBackupBeenUploaded + + val succeeded = withContext(Dispatchers.IO) { + BackupRepository.turnOffAndDisableBackups() + } + + if (isActive) { + if (succeeded) { + if (hasMediaBackupUploaded && SignalStore.backup.optimizeStorage) { + Log.d(TAG, "User has optimized storage, downloading.") + requestDialog(RemoteBackupsSettingsState.Dialog.DOWNLOADING_YOUR_BACKUP) + + SignalStore.backup.optimizeStorage = false + RestoreOptimizedMediaJob.enqueue() + } else { + Log.d(TAG, "User does not have optimized storage, finished.") + requestDialog(RemoteBackupsSettingsState.Dialog.NONE) + } + refresh() + } else { + Log.d(TAG, "Failed to disable backups.") + requestDialog(RemoteBackupsSettingsState.Dialog.TURN_OFF_FAILED) + } + } + } + } + + fun onBackupNowClick() { + BackupMessagesJob.enqueue() + } + private suspend fun refreshState(lastPurchase: InAppPaymentTable.InAppPayment?) { val tier = SignalStore.backup.latestBackupTier @@ -307,39 +359,6 @@ class RemoteBackupsSettingsViewModel : ViewModel() { } } - fun turnOffAndDeleteBackups() { - viewModelScope.launch { - Log.d(TAG, "Beginning to turn off and delete backup.") - requestDialog(RemoteBackupsSettingsState.Dialog.PROGRESS_SPINNER) - - val hasMediaBackupUploaded = SignalStore.backup.backsUpMedia && SignalStore.backup.hasBackupBeenUploaded - - val succeeded = withContext(Dispatchers.IO) { - BackupRepository.turnOffAndDisableBackups() - } - - if (isActive) { - if (succeeded) { - if (hasMediaBackupUploaded && SignalStore.backup.optimizeStorage) { - Log.d(TAG, "User has optimized storage, downloading.") - requestDialog(RemoteBackupsSettingsState.Dialog.DOWNLOADING_YOUR_BACKUP) - - SignalStore.backup.optimizeStorage = false - RestoreOptimizedMediaJob.enqueue() - } else { - Log.d(TAG, "User does not have optimized storage, finished.") - requestDialog(RemoteBackupsSettingsState.Dialog.NONE) - } - refresh() - } else { - Log.d(TAG, "Failed to disable backups.") - requestDialog(RemoteBackupsSettingsState.Dialog.TURN_OFF_FAILED) - } - } - } - } - - fun onBackupNowClick() { - BackupMessagesJob.enqueue() + private fun refreshLocalState() { } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/components/settings/conversation/InternalConversationSettingsFragment.kt b/app/src/main/java/org/thoughtcrime/securesms/components/settings/conversation/InternalConversationSettingsFragment.kt index c46cb31a42..9d3f6d08eb 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/components/settings/conversation/InternalConversationSettingsFragment.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/components/settings/conversation/InternalConversationSettingsFragment.kt @@ -1,5 +1,6 @@ package org.thoughtcrime.securesms.components.settings.conversation +import android.graphics.Bitmap import android.graphics.Color import android.text.TextUtils import android.widget.Toast @@ -16,11 +17,14 @@ import org.signal.core.util.withinTransaction import org.signal.libsignal.zkgroup.profiles.ProfileKey import org.thoughtcrime.securesms.MainActivity import org.thoughtcrime.securesms.R +import org.thoughtcrime.securesms.attachments.Attachment +import org.thoughtcrime.securesms.attachments.UriAttachment import org.thoughtcrime.securesms.components.settings.DSLConfiguration import org.thoughtcrime.securesms.components.settings.DSLSettingsFragment import org.thoughtcrime.securesms.components.settings.DSLSettingsText import org.thoughtcrime.securesms.components.settings.app.subscription.InAppPaymentsRepository import org.thoughtcrime.securesms.components.settings.configure +import org.thoughtcrime.securesms.database.AttachmentTable import org.thoughtcrime.securesms.database.MessageType import org.thoughtcrime.securesms.database.SignalDatabase import org.thoughtcrime.securesms.database.model.InAppPaymentSubscriberRecord @@ -31,14 +35,18 @@ import org.thoughtcrime.securesms.keyvalue.SignalStore import org.thoughtcrime.securesms.mms.IncomingMessage import org.thoughtcrime.securesms.mms.OutgoingMessage import org.thoughtcrime.securesms.profiles.AvatarHelper +import org.thoughtcrime.securesms.providers.BlobProvider import org.thoughtcrime.securesms.recipients.Recipient import org.thoughtcrime.securesms.recipients.RecipientForeverObserver import org.thoughtcrime.securesms.recipients.RecipientId +import org.thoughtcrime.securesms.util.BitmapUtil +import org.thoughtcrime.securesms.util.MediaUtil import org.thoughtcrime.securesms.util.SpanUtil import org.thoughtcrime.securesms.util.Util import org.thoughtcrime.securesms.util.adapter.mapping.MappingAdapter import org.thoughtcrime.securesms.util.livedata.Store import java.util.Objects +import kotlin.random.Random import kotlin.time.Duration.Companion.nanoseconds import kotlin.time.DurationUnit @@ -267,40 +275,71 @@ class InternalConversationSettingsFragment : DSLSettingsFragment( } ) - clickPref( - title = DSLSettingsText.from("Add 1,000 dummy messages"), - summary = DSLSettingsText.from("Just adds 1,000 random messages to the chat. Text-only, nothing complicated."), - onClick = { - MaterialAlertDialogBuilder(requireContext()) - .setTitle("Are you sure?") - .setNegativeButton(android.R.string.cancel) { d, _ -> d.dismiss() } - .setPositiveButton(android.R.string.ok) { _, _ -> - val messageCount = 1000 - val startTime = System.currentTimeMillis() - messageCount - SignalDatabase.rawDatabase.withinTransaction { - val targetThread = SignalDatabase.threads.getOrCreateThreadIdFor(recipient) - for (i in 1..messageCount) { - val time = startTime + i - if (Math.random() > 0.5) { + if (!recipient.isGroup) { + clickPref( + title = DSLSettingsText.from("Add 1,000 dummy messages"), + summary = DSLSettingsText.from("Just adds 1,000 random messages to the chat. Text-only, nothing complicated."), + onClick = { + MaterialAlertDialogBuilder(requireContext()) + .setTitle("Are you sure?") + .setNegativeButton(android.R.string.cancel) { d, _ -> d.dismiss() } + .setPositiveButton(android.R.string.ok) { _, _ -> + val messageCount = 1000 + val startTime = System.currentTimeMillis() - messageCount + SignalDatabase.rawDatabase.withinTransaction { + val targetThread = SignalDatabase.threads.getOrCreateThreadIdFor(recipient) + for (i in 1..messageCount) { + val time = startTime + i + if (Math.random() > 0.5) { + val id = SignalDatabase.messages.insertMessageOutbox( + message = OutgoingMessage(threadRecipient = recipient, sentTimeMillis = time, body = "Outgoing: $i"), + threadId = targetThread + ) + SignalDatabase.messages.markAsSent(id, true) + } else { + SignalDatabase.messages.insertMessageInbox( + retrieved = IncomingMessage(type = MessageType.NORMAL, from = recipient.id, sentTimeMillis = time, serverTimeMillis = time, receivedTimeMillis = System.currentTimeMillis(), body = "Incoming: $i"), + candidateThreadId = targetThread + ) + } + } + } + + Toast.makeText(context, "Done!", Toast.LENGTH_SHORT).show() + } + .show() + } + ) + + clickPref( + title = DSLSettingsText.from("Add 10 dummy messages with attachments"), + summary = DSLSettingsText.from("Adds 10 random messages to the chat with attachments of a random image. Attachments are not uploaded."), + onClick = { + MaterialAlertDialogBuilder(requireContext()) + .setTitle("Are you sure?") + .setNegativeButton(android.R.string.cancel) { d, _ -> d.dismiss() } + .setPositiveButton(android.R.string.ok) { _, _ -> + val messageCount = 10 + val startTime = System.currentTimeMillis() - messageCount + SignalDatabase.rawDatabase.withinTransaction { + val targetThread = SignalDatabase.threads.getOrCreateThreadIdFor(recipient) + for (i in 1..messageCount) { + val time = startTime + i + val attachment = makeDummyAttachment() val id = SignalDatabase.messages.insertMessageOutbox( - message = OutgoingMessage(threadRecipient = recipient, sentTimeMillis = time, body = "Outgoing: $i"), + message = OutgoingMessage(threadRecipient = recipient, sentTimeMillis = time, body = "Outgoing: $i", attachments = listOf(attachment)), threadId = targetThread ) SignalDatabase.messages.markAsSent(id, true) - } else { - SignalDatabase.messages.insertMessageInbox( - retrieved = IncomingMessage(type = MessageType.NORMAL, from = recipient.id, sentTimeMillis = time, serverTimeMillis = time, receivedTimeMillis = System.currentTimeMillis(), body = "Incoming: $i"), - candidateThreadId = targetThread - ) } } - } - Toast.makeText(context, "Done!", Toast.LENGTH_SHORT).show() - } - .show() - } - ) + Toast.makeText(context, "Done!", Toast.LENGTH_SHORT).show() + } + .show() + } + ) + } if (recipient.isSelf) { sectionHeaderPref(DSLSettingsText.from("Donations")) @@ -399,6 +438,37 @@ class InternalConversationSettingsFragment : DSLSettingsFragment( } } + private fun makeDummyAttachment(): Attachment { + val bitmapDimens = 1024 + val bitmap = Bitmap.createBitmap( + IntArray(bitmapDimens * bitmapDimens) { Random.nextInt(0xFFFFFF) }, + 0, + bitmapDimens, + bitmapDimens, + bitmapDimens, + Bitmap.Config.RGB_565 + ) + val stream = BitmapUtil.toCompressedJpeg(bitmap) + val bytes = stream.readBytes() + val uri = BlobProvider.getInstance().forData(bytes).createForSingleSessionOnDisk(requireContext()) + return UriAttachment( + uri = uri, + contentType = MediaUtil.IMAGE_JPEG, + transferState = AttachmentTable.TRANSFER_PROGRESS_DONE, + size = bytes.size.toLong(), + fileName = null, + voiceNote = false, + borderless = false, + videoGif = false, + quote = false, + caption = null, + stickerLocator = null, + blurHash = null, + audioHash = null, + transformProperties = null + ) + } + private fun copyToClipboard(text: String) { Util.copyToClipboard(requireContext(), text) Toast.makeText(requireContext(), "Copied to clipboard", Toast.LENGTH_SHORT).show() diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt b/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt index 1d47fe6b4f..718e2b7a73 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt @@ -718,14 +718,20 @@ class AttachmentTable( } /** - * Returns the number of attachments that are in pending upload states to the archive cdn. + * Returns sum of the file sizes of attachments that are not fully uploaded to the archive CDN. */ - fun getPendingArchiveUploadCount(): Long { + fun getPendingArchiveUploadBytes(): Long { return readableDatabase - .count() - .from(TABLE_NAME) - .where("$ARCHIVE_TRANSFER_STATE IN (${ArchiveTransferState.UPLOAD_IN_PROGRESS.value}, ${ArchiveTransferState.COPY_PENDING.value})") - .run() + .rawQuery( + """ + SELECT SUM($DATA_SIZE) + FROM ( + SELECT DISTINCT $ARCHIVE_MEDIA_ID, $DATA_SIZE + FROM $TABLE_NAME + WHERE $ARCHIVE_TRANSFER_STATE NOT IN (${ArchiveTransferState.FINISHED.value}, ${ArchiveTransferState.PERMANENT_FAILURE.value}) + ) + """.trimIndent() + ) .readToSingleLong() } diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveAttachmentBackfillJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveAttachmentBackfillJob.kt index 15158b09fe..9e4883611f 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveAttachmentBackfillJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/ArchiveAttachmentBackfillJob.kt @@ -48,7 +48,7 @@ class ArchiveAttachmentBackfillJob private constructor(parameters: Parameters) : SignalDatabase.attachments.createKeyIvDigestForAttachmentsThatNeedArchiveUpload() - ArchiveUploadProgress.onAttachmentsStarted(jobs.size.toLong()) + ArchiveUploadProgress.onAttachmentsStarted(SignalDatabase.attachments.getPendingArchiveUploadBytes()) Log.i(TAG, "Adding ${jobs.size} jobs to backfill attachments.") AppDependencies.jobManager.addAll(jobs) diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupMessagesJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupMessagesJob.kt index 8543f734c2..bea37bec3c 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupMessagesJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/BackupMessagesJob.kt @@ -23,6 +23,7 @@ import org.thoughtcrime.securesms.jobs.protos.BackupMessagesJobData import org.thoughtcrime.securesms.keyvalue.SignalStore import org.thoughtcrime.securesms.providers.BlobProvider import org.whispersystems.signalservice.api.NetworkResult +import org.whispersystems.signalservice.api.messages.SignalServiceAttachment import org.whispersystems.signalservice.internal.push.AttachmentUploadForm import java.io.File import java.io.FileInputStream @@ -89,6 +90,10 @@ class BackupMessagesJob private constructor( override fun getFactoryKey(): String = KEY + override fun onAdded() { + ArchiveUploadProgress.begin() + } + override fun onFailure() { if (!isCanceled) { Log.w(TAG, "Failed to backup user messages. Marking failure state.") @@ -109,6 +114,8 @@ class BackupMessagesJob private constructor( BackupFileResult.Retry -> return Result.retry(defaultBackoff()) } + ArchiveUploadProgress.onMessageBackupCreated(tempBackupFile.length()) + this.syncTime = currentTime this.dataFile = tempBackupFile.path @@ -134,8 +141,16 @@ class BackupMessagesJob private constructor( is NetworkResult.ApplicationError -> throw result.throwable } + val progressListener = object : SignalServiceAttachment.ProgressListener { + override fun onAttachmentProgress(total: Long, progress: Long) { + ArchiveUploadProgress.onMessageBackupUploadProgress(total, progress) + } + + override fun shouldCancel(): Boolean = isCanceled + } + FileInputStream(tempBackupFile).use { - when (val result = BackupRepository.uploadBackupFile(backupSpec, it, tempBackupFile.length(), ArchiveUploadProgress.ArchiveUploadProgressListener { isCanceled })) { + when (val result = BackupRepository.uploadBackupFile(backupSpec, it, tempBackupFile.length(), progressListener)) { is NetworkResult.Success -> { Log.i(TAG, "Successfully uploaded backup file.") SignalStore.backup.hasBackupBeenUploaded = true @@ -204,7 +219,6 @@ class BackupMessagesJob private constructor( BlobProvider.getInstance().clearTemporaryBackupsDirectory(AppDependencies.application) - ArchiveUploadProgress.begin() val tempBackupFile = BlobProvider.getInstance().forTemporaryBackup(AppDependencies.application) val outputStream = FileOutputStream(tempBackupFile) @@ -244,8 +258,6 @@ class BackupMessagesJob private constructor( return BackupFileResult.Failure } - ArchiveUploadProgress.onMessageBackupCreated() - return BackupFileResult.Success(tempBackupFile, currentTime) } diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/CopyAttachmentToArchiveJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/CopyAttachmentToArchiveJob.kt index 62993e14ff..6adc120d27 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/CopyAttachmentToArchiveJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/CopyAttachmentToArchiveJob.kt @@ -141,7 +141,7 @@ class CopyAttachmentToArchiveJob private constructor(private val attachmentId: A ArchiveThumbnailUploadJob.enqueueIfNecessary(attachmentId) SignalStore.backup.usedBackupMediaSpace += AttachmentCipherStreamUtil.getCiphertextLength(PaddingInputStream.getPaddedSize(attachment.size)) - ArchiveUploadProgress.onAttachmentFinished() + ArchiveUploadProgress.onAttachmentFinished(attachmentId) } return result @@ -156,7 +156,7 @@ class CopyAttachmentToArchiveJob private constructor(private val attachmentId: A SignalDatabase.attachments.setArchiveTransferState(attachmentId, AttachmentTable.ArchiveTransferState.TEMPORARY_FAILURE) } - ArchiveUploadProgress.onAttachmentFinished() + ArchiveUploadProgress.onAttachmentFinished(attachmentId) } class Factory : Job.Factory { diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt index 5f5504137f..9ba6dda0d1 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/UploadAttachmentToArchiveJob.kt @@ -12,6 +12,7 @@ import org.signal.protos.resumableuploads.ResumableUpload import org.thoughtcrime.securesms.attachments.AttachmentId import org.thoughtcrime.securesms.attachments.AttachmentUploadUtil import org.thoughtcrime.securesms.attachments.DatabaseAttachment +import org.thoughtcrime.securesms.backup.ArchiveUploadProgress import org.thoughtcrime.securesms.backup.v2.BackupRepository import org.thoughtcrime.securesms.database.AttachmentTable import org.thoughtcrime.securesms.database.SignalDatabase @@ -24,6 +25,7 @@ import org.thoughtcrime.securesms.net.SignalNetwork import org.whispersystems.signalservice.api.NetworkResult import org.whispersystems.signalservice.api.archive.ArchiveMediaUploadFormStatusCodes import org.whispersystems.signalservice.api.attachment.AttachmentUploadResult +import org.whispersystems.signalservice.api.messages.SignalServiceAttachment import java.io.IOException import java.net.ProtocolException import kotlin.random.Random @@ -135,7 +137,11 @@ class UploadAttachmentToArchiveJob private constructor( context = context, attachment = attachment, uploadSpec = uploadSpec!!, - cancellationSignal = { this.isCanceled } + cancellationSignal = { this.isCanceled }, + progressListener = object : SignalServiceAttachment.ProgressListener { + override fun onAttachmentProgress(total: Long, progress: Long) = ArchiveUploadProgress.onAttachmentProgress(attachmentId, progress) + override fun shouldCancel() = this@UploadAttachmentToArchiveJob.isCanceled + } ) } catch (e: IOException) { Log.e(TAG, "[$attachmentId] Failed to get attachment stream.", e) diff --git a/app/src/main/protowire/KeyValue.proto b/app/src/main/protowire/KeyValue.proto index 9b4ee7a308..c7f61160bf 100644 --- a/app/src/main/protowire/KeyValue.proto +++ b/app/src/main/protowire/KeyValue.proto @@ -19,9 +19,9 @@ message LeastActiveLinkedDevice { message ArchiveUploadProgressState { enum State { None = 0; - BackingUpMessages = 1; - UploadingMessages = 2; - UploadingAttachments = 3; + Export = 1; + UploadBackupFile = 2; + UploadMedia = 3; } /** @@ -41,7 +41,11 @@ message ArchiveUploadProgressState { } State state = 1; - uint64 completedAttachments = 2; - uint64 totalAttachments = 3; - BackupPhase backupPhase = 4; + BackupPhase backupPhase = 2; + uint64 frameExportCount = 3; + uint64 frameTotalCount = 4; + uint64 backupFileUploadedBytes = 5; + uint64 backupFileTotalBytes = 6; + uint64 mediaUploadedBytes = 7; + uint64 mediaTotalBytes = 8; } \ No newline at end of file diff --git a/app/src/main/res/values/strings.xml b/app/src/main/res/values/strings.xml index b465a40b41..899482f70c 100644 --- a/app/src/main/res/values/strings.xml +++ b/app/src/main/res/values/strings.xml @@ -8043,9 +8043,9 @@ Preparing backup… - - Processing %1$s of %2$s (%3$d%%) message - Processing %1$s of %2$s (%3$d%%) messages + + Processing %1$s of %2$s message (%3$d%%) + Processing %1$s of ~%2$s messages (%3$d%%) diff --git a/build.gradle.kts b/build.gradle.kts index d30fb7377d..de922e365e 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -147,11 +147,11 @@ tasks.register("checkStopship") { } } } - .awaitAll() + .awaitAll() } if (stopshipFiles.isNotEmpty()) { throw GradleException("STOPSHIP found! Files: $stopshipFiles") } } -} \ No newline at end of file +} diff --git a/core-util-jvm/src/main/java/org/signal/core/util/FlowExtensions.kt b/core-util-jvm/src/main/java/org/signal/core/util/FlowExtensions.kt index 45d252d67a..e013909b63 100644 --- a/core-util-jvm/src/main/java/org/signal/core/util/FlowExtensions.kt +++ b/core-util-jvm/src/main/java/org/signal/core/util/FlowExtensions.kt @@ -7,7 +7,9 @@ package org.signal.core.util import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.channelFlow import kotlinx.coroutines.flow.conflate +import kotlinx.coroutines.flow.filterNot import kotlinx.coroutines.flow.onEach import kotlin.time.Duration @@ -16,9 +18,20 @@ import kotlin.time.Duration * * You can think of this like debouncing, but with "checkpoints" so that even if you have a constant stream of values, * you'll still get an emission every [timeout] (unlike debouncing, which will only emit once the stream settles down). + * + * You can specify an optional [emitImmediately] function that will indicate whether an emission should skip throttling and + * be emitted immediately. This lambda should be stateless, as it may be called multiple times for each item. */ -fun Flow.throttleLatest(timeout: Duration): Flow { - return this - .conflate() - .onEach { delay(timeout) } +fun Flow.throttleLatest(timeout: Duration, emitImmediately: (T) -> Boolean = { false }): Flow { + val rootFlow = this + return channelFlow { + rootFlow + .onEach { if (emitImmediately(it)) send(it) } + .filterNot { emitImmediately(it) } + .conflate() + .collect { + send(it) + delay(timeout) + } + } } diff --git a/core-util-jvm/src/test/java/org/signal/core/util/FlowExtensionsTests.kt b/core-util-jvm/src/test/java/org/signal/core/util/FlowExtensionsTests.kt index 43bd2f63e7..bc74c3b140 100644 --- a/core-util-jvm/src/test/java/org/signal/core/util/FlowExtensionsTests.kt +++ b/core-util-jvm/src/test/java/org/signal/core/util/FlowExtensionsTests.kt @@ -60,4 +60,20 @@ class FlowExtensionsTests { assertEquals(listOf(1, 5, 10, 15, 20, 25, 30), output) } + + @Test + fun `throttleLatest - respects skipThrottle`() = runTest { + val testFlow = flow { + for (i in 1..30) { + emit(i) + delay(10) + } + } + + val output = testFlow + .throttleLatest(50.milliseconds) { it in setOf(2, 3, 4, 26, 27, 28) } + .toList() + + assertEquals(listOf(1, 2, 3, 4, 5, 10, 15, 20, 25, 26, 27, 28, 30), output) + } } diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/push/PushServiceSocket.java b/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/push/PushServiceSocket.java index 018925b5c9..8f649baeb2 100644 --- a/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/push/PushServiceSocket.java +++ b/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/push/PushServiceSocket.java @@ -1080,9 +1080,9 @@ public class PushServiceSocket { public void uploadBackupFile(AttachmentUploadForm uploadForm, String resumableUploadUrl, InputStream data, long dataLength, ProgressListener progressListener) throws IOException { if (uploadForm.cdn == 2) { - uploadToCdn2(resumableUploadUrl, data, "application/octet-stream", dataLength, false, new NoCipherOutputStreamFactory(), null, null); + uploadToCdn2(resumableUploadUrl, data, "application/octet-stream", dataLength, false, new NoCipherOutputStreamFactory(), progressListener, null); } else { - uploadToCdn3(resumableUploadUrl, data, "application/octet-stream", dataLength, false, new NoCipherOutputStreamFactory(), null, null, uploadForm.headers); + uploadToCdn3(resumableUploadUrl, data, "application/octet-stream", dataLength, false, new NoCipherOutputStreamFactory(), progressListener, null, uploadForm.headers); } }