diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentUploadJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentUploadJob.kt index 14087abe91..a792be7c18 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentUploadJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentUploadJob.kt @@ -5,7 +5,9 @@ package org.thoughtcrime.securesms.jobs import android.text.TextUtils +import okhttp3.internal.http2.StreamResetException import org.greenrobot.eventbus.EventBus +import org.signal.core.util.concurrent.SignalExecutors import org.signal.core.util.inRoundedDays import org.signal.core.util.logging.Log import org.signal.core.util.mebiBytes @@ -23,6 +25,7 @@ import org.thoughtcrime.securesms.jobmanager.Job import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec import org.thoughtcrime.securesms.jobs.protos.AttachmentUploadJobData +import org.thoughtcrime.securesms.keyvalue.SignalStore import org.thoughtcrime.securesms.mms.MmsException import org.thoughtcrime.securesms.net.NotPushRegisteredException import org.thoughtcrime.securesms.recipients.Recipient @@ -39,6 +42,7 @@ import java.util.Optional import java.util.concurrent.TimeUnit import kotlin.time.Duration.Companion.days import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.Duration.Companion.minutes /** * Uploads an attachment without alteration. @@ -56,6 +60,8 @@ class AttachmentUploadJob private constructor( private val TAG = Log.tag(AttachmentUploadJob::class.java) + private val NETWORK_RESET_THRESHOLD = 1.minutes.inWholeMilliseconds + val UPLOAD_REUSE_THRESHOLD = 3.days.inWholeMilliseconds /** @@ -162,6 +168,19 @@ class AttachmentUploadJob private constructor( ArchiveThumbnailUploadJob.enqueueIfNecessary(databaseAttachment.attachmentId) } } + } catch (e: StreamResetException) { + val lastReset = SignalStore.misc.lastNetworkResetDueToStreamResets + val now = System.currentTimeMillis() + + if (lastReset > now || lastReset + NETWORK_RESET_THRESHOLD > now) { + Log.w(TAG, "Our existing connections is getting repeatedly denied by the server, reset network to establish new connections") + AppDependencies.resetNetwork() + SignalStore.misc.lastNetworkResetDueToStreamResets = now + } else { + Log.i(TAG, "Stream reset during upload, not resetting network yet, last reset: $lastReset") + } + + throw e } catch (e: NonSuccessfulResumableUploadResponseCodeException) { if (e.code == 400) { Log.w(TAG, "Failed to upload due to a 400 when getting resumable upload information. Clearing upload spec.", e) @@ -217,9 +236,18 @@ class AttachmentUploadJob private constructor( uploadSpec = resumableUploadSpec, cancellationSignal = { isCanceled }, progressListener = object : SignalServiceAttachment.ProgressListener { + private var lastUpdate = 0L + private val updateRate = 500.milliseconds.inWholeMilliseconds + override fun onAttachmentProgress(total: Long, progress: Long) { - EventBus.getDefault().postSticky(PartProgressEvent(attachment, PartProgressEvent.Type.NETWORK, total, progress)) - notification?.progress = (progress.toFloat() / total) + val now = System.currentTimeMillis() + if (now < lastUpdate || lastUpdate + updateRate < now || progress >= total) { + SignalExecutors.BOUNDED_IO.execute { + EventBus.getDefault().postSticky(PartProgressEvent(attachment, PartProgressEvent.Type.NETWORK, total, progress)) + notification?.progress = (progress.toFloat() / total) + } + lastUpdate = now + } } override fun shouldCancel(): Boolean { diff --git a/app/src/main/java/org/thoughtcrime/securesms/keyvalue/MiscellaneousValues.kt b/app/src/main/java/org/thoughtcrime/securesms/keyvalue/MiscellaneousValues.kt index 8c32330d4b..a5f4463233 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/keyvalue/MiscellaneousValues.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/keyvalue/MiscellaneousValues.kt @@ -37,6 +37,7 @@ class MiscellaneousValues internal constructor(store: KeyValueStore) : SignalSto private const val LEAST_ACTIVE_LINKED_DEVICE = "misc.linked_device.least_active" private const val NEXT_DATABASE_ANALYSIS_TIME = "misc.next_database_analysis_time" private const val LOCK_SCREEN_ATTEMPT_COUNT = "misc.lock_screen_attempt_count" + private const val LAST_NETWORK_RESET_TIME = "misc.last_network_reset_time" } public override fun onFirstEverAppLaunch() { @@ -258,4 +259,6 @@ class MiscellaneousValues internal constructor(store: KeyValueStore) : SignalSto fun incrementLockScreenAttemptCount() { lockScreenAttemptCount++ } + + var lastNetworkResetDueToStreamResets: Long by longValue(LAST_NETWORK_RESET_TIME, 0L) } diff --git a/core-util-jvm/src/main/java/org/signal/core/util/StreamUtil.java b/core-util-jvm/src/main/java/org/signal/core/util/StreamUtil.java index 2ac01b443e..466b57919a 100644 --- a/core-util-jvm/src/main/java/org/signal/core/util/StreamUtil.java +++ b/core-util-jvm/src/main/java/org/signal/core/util/StreamUtil.java @@ -106,6 +106,7 @@ public final class StreamUtil { } in.close(); + out.flush(); out.close(); return total; diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/push/http/DigestingRequestBody.kt b/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/push/http/DigestingRequestBody.kt index b1475a6978..85cf552d30 100644 --- a/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/push/http/DigestingRequestBody.kt +++ b/libsignal-service/src/main/java/org/whispersystems/signalservice/internal/push/http/DigestingRequestBody.kt @@ -50,7 +50,7 @@ class DigestingRequestBody( outputStreamFactory.createFor(inner) } - val buffer = ByteArray(8192) + val buffer = ByteArray(16 * 1024) var read: Int while (inputStream.read(buffer, 0, buffer.size).also { read = it } != -1) {