From c4846d92daf276a5f91eac6e71f09f340fcf5bfe Mon Sep 17 00:00:00 2001 From: Cody Henthorne Date: Thu, 18 Jun 2026 15:12:44 -0400 Subject: [PATCH] Add request backfill attachment support for linked devices. --- ...MessageProcessorTest_attachmentBackfill.kt | 338 ++++++++++++ .../securesms/components/AudioView.java | 67 ++- .../securesms/components/DocumentView.java | 59 +- .../transfercontrols/TransferControlView.kt | 71 ++- .../TransferControlViewState.kt | 3 +- .../transfercontrols/TransferControls.kt | 25 +- .../conversation/v2/ConversationFragment.kt | 24 + .../securesms/database/AttachmentTable.kt | 44 ++ .../securesms/jobs/AttachmentBackfill.kt | 289 ++++++++++ .../securesms/jobs/AttachmentDownloadJob.kt | 70 ++- .../securesms/jobs/JobManagerFactories.java | 1 + ...MultiDeviceAttachmentBackfillRequestJob.kt | 139 +++++ .../MultiDeviceAttachmentBackfillUpdateJob.kt | 10 +- .../securesms/jobs/RestoreAttachmentJob.kt | 14 +- .../messages/SyncMessageProcessor.kt | 143 ++++- app/src/main/protowire/JobData.proto | 4 + app/src/main/res/values/strings.xml | 6 + .../transfercontrols/TransferControlsTest.kt | 80 ++- .../securesms/jobs/AttachmentBackfillTest.kt | 507 ++++++++++++++++++ .../api/SignalServiceMessageSender.java | 10 + .../multidevice/SignalServiceSyncMessage.java | 59 +- 21 files changed, 1863 insertions(+), 100 deletions(-) create mode 100644 app/src/androidTest/java/org/thoughtcrime/securesms/messages/SyncMessageProcessorTest_attachmentBackfill.kt create mode 100644 app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentBackfill.kt create mode 100644 app/src/main/java/org/thoughtcrime/securesms/jobs/MultiDeviceAttachmentBackfillRequestJob.kt create mode 100644 app/src/test/java/org/thoughtcrime/securesms/jobs/AttachmentBackfillTest.kt diff --git a/app/src/androidTest/java/org/thoughtcrime/securesms/messages/SyncMessageProcessorTest_attachmentBackfill.kt b/app/src/androidTest/java/org/thoughtcrime/securesms/messages/SyncMessageProcessorTest_attachmentBackfill.kt new file mode 100644 index 0000000000..286c2c92bf --- /dev/null +++ b/app/src/androidTest/java/org/thoughtcrime/securesms/messages/SyncMessageProcessorTest_attachmentBackfill.kt @@ -0,0 +1,338 @@ +/* + * Copyright 2026 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.thoughtcrime.securesms.messages + +import androidx.test.ext.junit.runners.AndroidJUnit4 +import assertk.assertThat +import assertk.assertions.isEqualTo +import assertk.assertions.isNotNull +import okio.ByteString.Companion.toByteString +import org.junit.After +import org.junit.Before +import org.junit.Rule +import org.junit.Test +import org.junit.runner.RunWith +import org.signal.core.models.database.AttachmentId +import org.signal.core.util.Base64 +import org.thoughtcrime.securesms.database.AttachmentTable +import org.thoughtcrime.securesms.database.MessageTable +import org.thoughtcrime.securesms.database.SignalDatabase +import org.thoughtcrime.securesms.keyvalue.SignalStore +import org.thoughtcrime.securesms.recipients.Recipient +import org.thoughtcrime.securesms.recipients.RecipientId +import org.thoughtcrime.securesms.testing.MessageContentFuzzer +import org.thoughtcrime.securesms.testing.SignalActivityRule +import org.thoughtcrime.securesms.util.MediaUtil +import org.whispersystems.signalservice.api.push.SignalServiceAddress +import org.whispersystems.signalservice.internal.push.AddressableMessage +import org.whispersystems.signalservice.internal.push.AttachmentPointer +import org.whispersystems.signalservice.internal.push.Content +import org.whispersystems.signalservice.internal.push.ConversationIdentifier +import org.whispersystems.signalservice.internal.push.DataMessage +import org.whispersystems.signalservice.internal.push.SyncMessage +import java.util.UUID + +@Suppress("ClassName") +@RunWith(AndroidJUnit4::class) +class SyncMessageProcessorTest_attachmentBackfill { + + @get:Rule + val harness = SignalActivityRule(createGroup = true) + + private lateinit var messageHelper: MessageHelper + private var originalDeviceId: Int = SignalServiceAddress.DEFAULT_DEVICE_ID + + @Before + fun setUp() { + messageHelper = MessageHelper(harness) + originalDeviceId = SignalStore.account.deviceId + // Make this device a linked device so backfill response handling activates. + SignalStore.account.deviceId = 2 + } + + @After + fun tearDown() { + SignalStore.account.deviceId = originalDeviceId + messageHelper.tearDown() + } + + @Test + fun fresh_pointer_updates_row_and_resets_transfer_state() { + val (messageId, attachmentId) = insertIncomingMediaMessage(messageHelper.alice) + SignalDatabase.attachments.setTransferProgressFailed(attachmentId, messageId) + + val pointer = freshPointer(cdnNumber = 3, cdnKey = "fresh-key", size = 1234, uploadTimestamp = 9_999_000L) + deliverBackfillResponse( + sender = messageHelper.alice, + sentTimestamp = sentTimestampFor(messageId), + conversationId = messageHelper.alice, + attachmentData = listOf(SyncMessage.AttachmentBackfillResponse.AttachmentData(attachment = pointer)) + ) + + // transferState is not asserted: the forced download job's onAdded() races it PENDING -> STARTED. The pointer fields + // are written synchronously and are stable. + val refreshed = SignalDatabase.attachments.getAttachmentsForMessage(messageId).single() + assertThat(refreshed.remoteLocation).isEqualTo("fresh-key") + assertThat(refreshed.cdn.cdnNumber).isEqualTo(3) + assertThat(refreshed.size).isEqualTo(1234L) + assertThat(refreshed.uploadTimestamp).isEqualTo(9_999_000L) + } + + @Test + fun terminal_error_marks_permanent_failure() { + val (messageId, attachmentId) = insertIncomingMediaMessage(messageHelper.alice) + SignalDatabase.attachments.setTransferProgressFailed(attachmentId, messageId) + + deliverBackfillResponse( + sender = messageHelper.alice, + sentTimestamp = sentTimestampFor(messageId), + conversationId = messageHelper.alice, + attachmentData = listOf( + SyncMessage.AttachmentBackfillResponse.AttachmentData( + status = SyncMessage.AttachmentBackfillResponse.AttachmentData.Status.TERMINAL_ERROR + ) + ) + ) + + val refreshed = SignalDatabase.attachments.getAttachmentsForMessage(messageId).single() + assertThat(refreshed.transferState).isEqualTo(AttachmentTable.TRANSFER_PROGRESS_PERMANENT_FAILURE) + } + + @Test + fun pending_status_leaves_row_unchanged() { + val (messageId, attachmentId) = insertIncomingMediaMessage(messageHelper.alice) + SignalDatabase.attachments.setTransferProgressFailed(attachmentId, messageId) + + deliverBackfillResponse( + sender = messageHelper.alice, + sentTimestamp = sentTimestampFor(messageId), + conversationId = messageHelper.alice, + attachmentData = listOf( + SyncMessage.AttachmentBackfillResponse.AttachmentData( + status = SyncMessage.AttachmentBackfillResponse.AttachmentData.Status.PENDING + ) + ) + ) + + val refreshed = SignalDatabase.attachments.getAttachmentsForMessage(messageId).single() + assertThat(refreshed.transferState).isEqualTo(AttachmentTable.TRANSFER_PROGRESS_FAILED) + } + + @Test + fun message_not_found_error_marks_attachments_retryable_failed() { + val (messageId, attachmentId) = insertIncomingMediaMessage(messageHelper.alice) + SignalDatabase.attachments.setTransferProgressFailed(attachmentId, messageId) + + deliverBackfillResponse( + sender = messageHelper.alice, + sentTimestamp = sentTimestampFor(messageId), + conversationId = messageHelper.alice, + error = SyncMessage.AttachmentBackfillResponse.Error.MESSAGE_NOT_FOUND + ) + + val refreshed = SignalDatabase.attachments.getAttachmentsForMessage(messageId).single() + assertThat(refreshed.transferState).isEqualTo(AttachmentTable.TRANSFER_PROGRESS_FAILED) + } + + @Test + fun primary_device_ignores_backfill_response() { + SignalStore.account.deviceId = SignalServiceAddress.DEFAULT_DEVICE_ID + + val (messageId, attachmentId) = insertIncomingMediaMessage(messageHelper.alice) + SignalDatabase.attachments.setTransferProgressFailed(attachmentId, messageId) + + deliverBackfillResponse( + sender = messageHelper.alice, + sentTimestamp = sentTimestampFor(messageId), + conversationId = messageHelper.alice, + attachmentData = listOf( + SyncMessage.AttachmentBackfillResponse.AttachmentData( + status = SyncMessage.AttachmentBackfillResponse.AttachmentData.Status.TERMINAL_ERROR + ) + ) + ) + + val refreshed = SignalDatabase.attachments.getAttachmentsForMessage(messageId).single() + assertThat(refreshed.transferState).isEqualTo(AttachmentTable.TRANSFER_PROGRESS_FAILED) + } + + @Test + fun multi_attachment_response_matches_positionally_with_mixed_status() { + val messageId = insertIncomingMessageWith(messageHelper.alice, listOf(incomingImagePointer(), incomingImagePointer())) + val body = SignalDatabase.attachments.getAttachmentsForMessage(messageId).sortedBy { it.displayOrder } + assertThat(body.size).isEqualTo(2) + body.forEach { SignalDatabase.attachments.setTransferProgressFailed(it.attachmentId, messageId) } + + // Response is a positional array: index 0 -> body[0] (fresh pointer), index 1 -> body[1] (terminal). + deliverBackfillResponse( + sender = messageHelper.alice, + sentTimestamp = sentTimestampFor(messageId), + conversationId = messageHelper.alice, + attachmentData = listOf( + SyncMessage.AttachmentBackfillResponse.AttachmentData(attachment = freshPointer(cdnNumber = 3, cdnKey = "first-key", size = 11, uploadTimestamp = 111L)), + SyncMessage.AttachmentBackfillResponse.AttachmentData(status = SyncMessage.AttachmentBackfillResponse.AttachmentData.Status.TERMINAL_ERROR) + ) + ) + + val refreshed = SignalDatabase.attachments.getAttachmentsForMessage(messageId).sortedBy { it.displayOrder } + // remoteLocation proves index 0 routed to body[0]. transferState is not asserted: it races the download job's onAdded(). + assertThat(refreshed[0].remoteLocation).isEqualTo("first-key") + assertThat(refreshed[0].cdn.cdnNumber).isEqualTo(3) + assertThat(refreshed[1].transferState).isEqualTo(AttachmentTable.TRANSFER_PROGRESS_PERMANENT_FAILURE) + } + + @Test + fun long_text_slot_is_applied_independently_of_the_body() { + val messageId = insertIncomingMessageWith(messageHelper.alice, listOf(incomingImagePointer(), incomingLongTextPointer())) + val all = SignalDatabase.attachments.getAttachmentsForMessage(messageId) + all.forEach { SignalDatabase.attachments.setTransferProgressFailed(it.attachmentId, messageId) } + + deliverBackfillResponse( + sender = messageHelper.alice, + sentTimestamp = sentTimestampFor(messageId), + conversationId = messageHelper.alice, + attachmentData = listOf(SyncMessage.AttachmentBackfillResponse.AttachmentData(attachment = freshPointer(cdnNumber = 3, cdnKey = "body-key", size = 22, uploadTimestamp = 222L))), + longText = SyncMessage.AttachmentBackfillResponse.AttachmentData(attachment = freshPointer(cdnNumber = 3, cdnKey = "long-text-key", size = 33, uploadTimestamp = 333L)) + ) + + val refreshed = SignalDatabase.attachments.getAttachmentsForMessage(messageId) + val bodyRow = refreshed.single { it.contentType != MediaUtil.LONG_TEXT } + val longTextRow = refreshed.single { it.contentType == MediaUtil.LONG_TEXT } + // The positional `attachments` array fills the body row and the separate `longText` slot fills the long-text row, + // with no cross-contamination. transferState is not asserted: it races the download job's onAdded(). + assertThat(bodyRow.remoteLocation).isEqualTo("body-key") + assertThat(longTextRow.remoteLocation).isEqualTo("long-text-key") + } + + @Test + fun remote_attachment_list_longer_than_local_skips_extras() { + val messageId = insertIncomingMessageWith(messageHelper.alice, listOf(incomingImagePointer())) + val attachmentId = SignalDatabase.attachments.getAttachmentsForMessage(messageId).single().attachmentId + SignalDatabase.attachments.setTransferProgressFailed(attachmentId, messageId) + + deliverBackfillResponse( + sender = messageHelper.alice, + sentTimestamp = sentTimestampFor(messageId), + conversationId = messageHelper.alice, + attachmentData = listOf( + SyncMessage.AttachmentBackfillResponse.AttachmentData(attachment = freshPointer(cdnNumber = 3, cdnKey = "only-key", size = 44, uploadTimestamp = 444L)), + SyncMessage.AttachmentBackfillResponse.AttachmentData(attachment = freshPointer(cdnNumber = 3, cdnKey = "extra-key", size = 55, uploadTimestamp = 555L)) + ) + ) + + // The single local row is routed from index 0; the extra index-1 entry has no body[1] and must be skipped, not throw. + val refreshed = SignalDatabase.attachments.getAttachmentsForMessage(messageId).single() + assertThat(refreshed.remoteLocation).isEqualTo("only-key") + } + + private fun insertIncomingMediaMessage(sender: RecipientId): Pair { + messageHelper.startTime = messageHelper.nextStartTime() + val sentTimestamp = messageHelper.startTime + + val content = Content.Builder() + .dataMessage( + DataMessage.Builder() + .timestamp(sentTimestamp) + .attachments(listOf(MessageContentFuzzer.attachmentPointer())) + .build() + ) + .build() + + messageHelper.processor.process( + envelope = MessageContentFuzzer.envelope(sentTimestamp), + content = content, + metadata = MessageContentFuzzer.envelopeMetadata(source = sender, destination = harness.self.id), + serverDeliveredTimestamp = sentTimestamp + 10 + ) + + val syncMessageId = MessageTable.SyncMessageId(sender, sentTimestamp) + val messageId = SignalDatabase.messages.getMessageIdOrNull(syncMessageId) + assertThat(messageId, name = "messageId").isNotNull() + + val attachment = SignalDatabase.attachments.getAttachmentsForMessage(messageId!!).single() + return messageId to attachment.attachmentId + } + + private fun insertIncomingMessageWith(sender: RecipientId, pointers: List): Long { + messageHelper.startTime = messageHelper.nextStartTime() + val sentTimestamp = messageHelper.startTime + + val content = Content.Builder() + .dataMessage( + DataMessage.Builder() + .timestamp(sentTimestamp) + .attachments(pointers) + .build() + ) + .build() + + messageHelper.processor.process( + envelope = MessageContentFuzzer.envelope(sentTimestamp), + content = content, + metadata = MessageContentFuzzer.envelopeMetadata(source = sender, destination = harness.self.id), + serverDeliveredTimestamp = sentTimestamp + 10 + ) + + val messageId = SignalDatabase.messages.getMessageIdOrNull(MessageTable.SyncMessageId(sender, sentTimestamp)) + assertThat(messageId, name = "messageId").isNotNull() + return messageId!! + } + + private fun incomingImagePointer(): AttachmentPointer = MessageContentFuzzer.attachmentPointer().newBuilder().contentType("image/jpeg").build() + + private fun incomingLongTextPointer(): AttachmentPointer = MessageContentFuzzer.attachmentPointer().newBuilder().contentType(MediaUtil.LONG_TEXT).build() + + private fun sentTimestampFor(messageId: Long): Long { + return SignalDatabase.messages.getMessageRecord(messageId).dateSent + } + + private fun deliverBackfillResponse( + sender: RecipientId, + sentTimestamp: Long, + conversationId: RecipientId, + attachmentData: List = emptyList(), + longText: SyncMessage.AttachmentBackfillResponse.AttachmentData? = null, + error: SyncMessage.AttachmentBackfillResponse.Error? = null + ) { + messageHelper.startTime = messageHelper.nextStartTime() + val envelopeTimestamp = messageHelper.startTime + + val response = SyncMessage.AttachmentBackfillResponse( + targetMessage = AddressableMessage( + authorServiceIdBinary = Recipient.resolved(sender).requireAci().toByteString(), + sentTimestamp = sentTimestamp + ), + targetConversation = ConversationIdentifier( + threadServiceIdBinary = Recipient.resolved(conversationId).requireAci().toByteString() + ), + attachments = if (error == null) SyncMessage.AttachmentBackfillResponse.AttachmentDataList(attachments = attachmentData, longText = longText) else null, + error = error + ) + + val content = Content.Builder() + .syncMessage(SyncMessage.Builder().attachmentBackfillResponse(response).build()) + .build() + + messageHelper.processor.process( + envelope = MessageContentFuzzer.envelope(envelopeTimestamp, serverGuid = UUID.randomUUID()), + content = content, + metadata = MessageContentFuzzer.envelopeMetadata(source = harness.self.id, destination = harness.self.id, sourceDeviceId = 1), + serverDeliveredTimestamp = envelopeTimestamp + 10 + ) + } + + private fun freshPointer(cdnNumber: Int, cdnKey: String, size: Int, uploadTimestamp: Long): AttachmentPointer { + return AttachmentPointer.Builder() + .cdnKey(cdnKey) + .cdnNumber(cdnNumber) + .key(Base64.decode("AAAAAAAA").toByteString()) + .digest(ByteArray(32) { it.toByte() }.toByteString()) + .size(size) + .uploadTimestamp(uploadTimestamp) + .contentType("image/jpeg") + .build() + } +} diff --git a/app/src/main/java/org/thoughtcrime/securesms/components/AudioView.java b/app/src/main/java/org/thoughtcrime/securesms/components/AudioView.java index 471fbe63b9..26dd4be214 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/components/AudioView.java +++ b/app/src/main/java/org/thoughtcrime/securesms/components/AudioView.java @@ -39,6 +39,7 @@ import org.thoughtcrime.securesms.audio.AudioWaveForms; import org.thoughtcrime.securesms.components.voice.VoiceNotePlaybackState; import org.thoughtcrime.securesms.database.AttachmentTable; import org.thoughtcrime.securesms.events.PartProgressEvent; +import org.thoughtcrime.securesms.jobs.AttachmentBackfill; import org.thoughtcrime.securesms.keyvalue.SignalStore; import org.thoughtcrime.securesms.mms.AudioSlide; import org.thoughtcrime.securesms.mms.SlideClickListener; @@ -82,9 +83,11 @@ public final class AudioView extends FrameLayout { private boolean isPlaying; private long durationMillis; private AudioSlide audioSlide; + private boolean showControls; private Callbacks callbacks; - private Disposable disposable = Disposable.disposed(); + private Disposable disposable = Disposable.disposed(); + private Disposable awaitingDisposable = Disposable.disposed(); private final Observer playbackStateObserver = this::onPlaybackState; @@ -159,6 +162,12 @@ public final class AudioView extends FrameLayout { protected void onAttachedToWindow() { super.onAttachedToWindow(); if (!EventBus.getDefault().isRegistered(this)) EventBus.getDefault().register(this); + + awaitingDisposable = AttachmentBackfill.awaitingChanges() + .observeOn(AndroidSchedulers.mainThread()) + .subscribe(ignored -> { + if (audioSlide != null) presentTransferControls(audioSlide, showControls); + }, t -> Log.w(TAG, "Error observing backfill awaiting state.", t)); } @Override @@ -166,6 +175,7 @@ public final class AudioView extends FrameLayout { super.onDetachedFromWindow(); EventBus.getDefault().unregister(this); disposable.dispose(); + awaitingDisposable.dispose(); } public void setProgressAndPlayBackgroundTint(@ColorInt int color) { @@ -197,27 +207,8 @@ public final class AudioView extends FrameLayout { } } - if (showControls && audio.isPendingDownload()) { - controlToggle.displayQuick(downloadContainer); - seekBar.setEnabled(false); - downloadButton.setOnClickListener(new DownloadClickedListener(audio)); - if (circleProgress != null) { - if (circleProgress.isSpinning()) circleProgress.stopSpinning(); - circleProgress.setVisibility(View.GONE); - } - } else if (showControls && audio.getTransferState() == AttachmentTable.TRANSFER_PROGRESS_STARTED) { - controlToggle.displayQuick(progressAndPlay); - seekBar.setEnabled(false); - showPlayButton(); - if (circleProgress != null) { - circleProgress.setVisibility(View.VISIBLE); - circleProgress.spin(); - } - } else { - seekBar.setEnabled(true); - if (circleProgress != null && circleProgress.isSpinning()) circleProgress.stopSpinning(); - showPlayButton(); - } + this.showControls = showControls; + presentTransferControls(audio, showControls); if (seekBar instanceof WaveFormSeekBarView) { WaveFormSeekBarView waveFormView = (WaveFormSeekBarView) seekBar; @@ -263,6 +254,38 @@ public final class AudioView extends FrameLayout { } } + private void presentTransferControls(@NonNull AudioSlide audio, boolean showControls) { + DatabaseAttachment dbAttachment = audio.asAttachment() instanceof DatabaseAttachment ? (DatabaseAttachment) audio.asAttachment() : null; + + if (dbAttachment != null && dbAttachment.transferState == AttachmentTable.TRANSFER_PROGRESS_DONE && AttachmentBackfill.isAwaitingBackfill(dbAttachment.attachmentId)) { + AttachmentBackfill.onAttachmentTerminal(dbAttachment.attachmentId, dbAttachment.mmsId); + } + + boolean awaitingBackfill = dbAttachment != null && AttachmentBackfill.isAwaitingBackfill(dbAttachment.attachmentId); + + if (showControls && audio.isPendingDownload() && !awaitingBackfill) { + controlToggle.displayQuick(downloadContainer); + seekBar.setEnabled(false); + downloadButton.setOnClickListener(new DownloadClickedListener(audio)); + if (circleProgress != null) { + if (circleProgress.isSpinning()) circleProgress.stopSpinning(); + circleProgress.setVisibility(View.GONE); + } + } else if (showControls && (audio.getTransferState() == AttachmentTable.TRANSFER_PROGRESS_STARTED || awaitingBackfill)) { + controlToggle.displayQuick(progressAndPlay); + seekBar.setEnabled(false); + showPlayButton(); + if (circleProgress != null) { + circleProgress.setVisibility(View.VISIBLE); + if (!circleProgress.isSpinning()) circleProgress.spin(); + } + } else { + seekBar.setEnabled(true); + if (circleProgress != null && circleProgress.isSpinning()) circleProgress.stopSpinning(); + showPlayButton(); + } + } + public void setDownloadClickListener(@Nullable SlideClickListener listener) { this.downloadListener = listener; } diff --git a/app/src/main/java/org/thoughtcrime/securesms/components/DocumentView.java b/app/src/main/java/org/thoughtcrime/securesms/components/DocumentView.java index e8b2b4de7f..ab021ba19f 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/components/DocumentView.java +++ b/app/src/main/java/org/thoughtcrime/securesms/components/DocumentView.java @@ -28,6 +28,7 @@ import org.thoughtcrime.securesms.R; import org.thoughtcrime.securesms.attachments.DatabaseAttachment; import org.thoughtcrime.securesms.database.AttachmentTable; import org.thoughtcrime.securesms.events.PartProgressEvent; +import org.thoughtcrime.securesms.jobs.AttachmentBackfill; import org.thoughtcrime.securesms.keyvalue.SignalStore; import org.thoughtcrime.securesms.mms.Slide; import org.thoughtcrime.securesms.mms.SlideClickListener; @@ -36,6 +37,9 @@ import org.whispersystems.signalservice.api.util.OptionalUtil; import java.util.Collections; +import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers; +import io.reactivex.rxjava3.disposables.Disposable; + public class DocumentView extends FrameLayout { private static final String TAG = Log.tag(DocumentView.class); @@ -55,6 +59,9 @@ public class DocumentView extends FrameLayout { private @Nullable SlidesClickedListener cancelTransferClickListener; private @Nullable SlidesClickedListener resendTransferClickListener; private @Nullable Slide documentSlide; + private boolean showControls; + + private Disposable awaitingDisposable = Disposable.disposed(); public DocumentView(@NonNull Context context) { this(context, null); @@ -98,12 +105,19 @@ public class DocumentView extends FrameLayout { if (!EventBus.getDefault().isRegistered(this)) { EventBus.getDefault().register(this); } + + awaitingDisposable = AttachmentBackfill.awaitingChanges() + .observeOn(AndroidSchedulers.mainThread()) + .subscribe(ignored -> { + if (documentSlide != null) presentTransferControls(documentSlide, showControls); + }, t -> Log.w(TAG, "Error observing backfill awaiting state.", t)); } @Override protected void onDetachedFromWindow() { super.onDetachedFromWindow(); EventBus.getDefault().unregister(this); + awaitingDisposable.dispose(); } public void setDownloadClickListener(@Nullable SlideClickListener listener) { @@ -126,22 +140,8 @@ public class DocumentView extends FrameLayout { final boolean showControls, final boolean showSingleLineFilename) { - if (showControls && documentSlide.getTransferState() == AttachmentTable.TRANSFER_PROGRESS_STARTED) { - controlToggle.displayQuick(stopUploadButton); - downloadProgress.spin(); - stopUploadButton.setOnClickListener(new CancelTransferListener(documentSlide)); - } else if (showControls && documentSlide.getUri() != null && documentSlide.isPendingDownload()) { - controlToggle.displayQuick(uploadButton); - uploadButton.setOnClickListener(new ResendTransferClickListener(documentSlide)); - if (downloadProgress.isSpinning()) downloadProgress.stopSpinning(); - } else if (showControls && documentSlide.getUri() == null && documentSlide.isPendingDownload()) { - controlToggle.displayQuick(downloadButton); - downloadButton.setOnClickListener(new DownloadClickedListener(documentSlide)); - if (downloadProgress.isSpinning()) downloadProgress.stopSpinning(); - } else { - controlToggle.displayQuick(iconContainer); - if (downloadProgress.isSpinning()) downloadProgress.stopSpinning(); - } + this.showControls = showControls; + presentTransferControls(documentSlide, showControls); this.documentSlide = documentSlide; @@ -172,6 +172,33 @@ public class DocumentView extends FrameLayout { } } + private void presentTransferControls(@NonNull Slide documentSlide, boolean showControls) { + DatabaseAttachment dbAttachment = documentSlide.asAttachment() instanceof DatabaseAttachment ? (DatabaseAttachment) documentSlide.asAttachment() : null; + + if (dbAttachment != null && dbAttachment.transferState == AttachmentTable.TRANSFER_PROGRESS_DONE && AttachmentBackfill.isAwaitingBackfill(dbAttachment.attachmentId)) { + AttachmentBackfill.onAttachmentTerminal(dbAttachment.attachmentId, dbAttachment.mmsId); + } + + boolean awaitingBackfill = dbAttachment != null && AttachmentBackfill.isAwaitingBackfill(dbAttachment.attachmentId); + + if (showControls && (documentSlide.getTransferState() == AttachmentTable.TRANSFER_PROGRESS_STARTED || awaitingBackfill)) { + controlToggle.displayQuick(stopUploadButton); + if (!downloadProgress.isSpinning()) downloadProgress.spin(); + stopUploadButton.setOnClickListener(awaitingBackfill ? null : new CancelTransferListener(documentSlide)); + } else if (showControls && documentSlide.getUri() != null && documentSlide.isPendingDownload()) { + controlToggle.displayQuick(uploadButton); + uploadButton.setOnClickListener(new ResendTransferClickListener(documentSlide)); + if (downloadProgress.isSpinning()) downloadProgress.stopSpinning(); + } else if (showControls && documentSlide.getUri() == null && documentSlide.isPendingDownload()) { + controlToggle.displayQuick(downloadButton); + downloadButton.setOnClickListener(new DownloadClickedListener(documentSlide)); + if (downloadProgress.isSpinning()) downloadProgress.stopSpinning(); + } else { + controlToggle.displayQuick(iconContainer); + if (downloadProgress.isSpinning()) downloadProgress.stopSpinning(); + } + } + @Override public void setFocusable(boolean focusable) { super.setFocusable(focusable); diff --git a/app/src/main/java/org/thoughtcrime/securesms/components/transfercontrols/TransferControlView.kt b/app/src/main/java/org/thoughtcrime/securesms/components/transfercontrols/TransferControlView.kt index 5144c7932c..db3de83a48 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/components/transfercontrols/TransferControlView.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/components/transfercontrols/TransferControlView.kt @@ -12,9 +12,14 @@ import androidx.compose.runtime.mutableStateOf import androidx.compose.runtime.setValue import androidx.compose.ui.platform.AbstractComposeView import androidx.compose.ui.platform.ViewCompositionStrategy +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.cancel +import kotlinx.coroutines.launch import org.greenrobot.eventbus.EventBus import org.greenrobot.eventbus.Subscribe import org.greenrobot.eventbus.ThreadMode +import org.signal.core.models.database.AttachmentId import org.signal.core.ui.compose.theme.SignalTheme import org.signal.core.util.ByteSize import org.signal.core.util.ThrottledDebouncer @@ -25,6 +30,7 @@ import org.thoughtcrime.securesms.attachments.DatabaseAttachment import org.thoughtcrime.securesms.components.RecyclerViewParentTransitionController import org.thoughtcrime.securesms.database.AttachmentTable import org.thoughtcrime.securesms.events.PartProgressEvent +import org.thoughtcrime.securesms.jobs.AttachmentBackfill import org.thoughtcrime.securesms.mms.Slide import org.thoughtcrime.securesms.util.MediaUtil import java.util.UUID @@ -48,6 +54,11 @@ class TransferControlView @JvmOverloads constructor(context: Context, attrs: Att private val progressUpdateDebouncer = ThrottledDebouncer(100) + private var previousRenderState: TransferControlsRenderState = TransferControlsRenderState.Gone + + /** Active while view is attached */ + private var renderScope: CoroutineScope? = null + /** Per-instance id so a single recycled view can be isolated in logcat when [VERBOSE_DEVELOPMENT_LOGGING] is on. */ private val viewId by lazy { UUID.randomUUID().toString().take(8) } @@ -72,39 +83,56 @@ class TransferControlView @JvmOverloads constructor(context: Context, attrs: Att override fun onAttachedToWindow() { super.onAttachedToWindow() if (!EventBus.getDefault().isRegistered(this)) EventBus.getDefault().register(this) + + renderScope = CoroutineScope(Dispatchers.Main.immediate).also { scope -> + scope.launch { + AttachmentBackfill.awaiting.collect { renderState() } + } + } } override fun onDetachedFromWindow() { super.onDetachedFromWindow() EventBus.getDefault().unregister(this) + renderScope?.cancel() + renderScope = null } fun isGone(): Boolean { - return TransferControls.deriveRenderState(state) is TransferControlsRenderState.Gone + val state = TransferControls.deriveRenderState(state = state, awaitingAttachmentIds = state.slides.attachmentIdsAwaitingBackfill()) + return state is TransferControlsRenderState.Gone } private fun updateState(stateFactory: (TransferControlViewState) -> TransferControlViewState) { - val newState = stateFactory(state) + state = stateFactory(state) + renderState() + } - val oldRender = TransferControls.deriveRenderState(state) - val newRender = TransferControls.deriveRenderState(newState) - state = newState + /** + * Derives and applies the render state by combining [state] with the current awaiting attachments. Both the state and + * awaiting paths change events funnel through here. + */ + private fun renderState() { + val oldRenderState = previousRenderState + val newRenderState = TransferControls.deriveRenderState(state, state.slides.attachmentIdsAwaitingBackfill()) + previousRenderState = newRenderState - if (oldRender == newRender) { + if (oldRenderState == newRenderState) { return } - verboseLog { "render $oldRender -> $newRender slides=[${slidesAsLogString(newState.slides)}]" } + verboseLog { "render $oldRenderState -> $newRenderState slides=[${slidesAsLogString(state.slides)}]" } - if (oldRender is TransferControlsRenderState.InProgress && oldRender.isProgressOnlyDifference(newRender)) { + // Only throttle noisy progress changes + if (oldRenderState is TransferControlsRenderState.InProgress && oldRenderState.isProgressOnlyDifference(newRenderState)) { progressUpdateDebouncer.publish { - renderState = newRender + renderState = newRenderState visibility = VISIBLE } } else { progressUpdateDebouncer.clear() - renderState = newRender - if (newRender !is TransferControlsRenderState.Gone) { + renderState = newRenderState + if (newRenderState !is TransferControlsRenderState.Gone) { visibility = VISIBLE } } @@ -133,6 +161,7 @@ class TransferControlView @JvmOverloads constructor(context: Context, attrs: Att fun setSlides(slides: List) { require(slides.isNotEmpty()) { "Must provide at least one slide." } + clearResolvedBackfills(slides) updateState { state -> val isNewSlideSet = !isUpdateToExistingSet(state, slides) val networkProgress: MutableMap = if (isNewSlideSet) HashMap() else state.networkProgress.toMutableMap() @@ -201,10 +230,6 @@ class TransferControlView @JvmOverloads constructor(context: Context, attrs: Att updateState { it.copy(isVisible = isVisible) } } - fun setAwaitingPrimaryResponse(awaiting: Boolean) { - updateState { it.copy(awaitingPrimaryResponse = awaiting) } - } - override fun setFocusable(focusable: Boolean) { super.setFocusable(false) updateState { it.copy(isFocusable = focusable) } @@ -215,6 +240,22 @@ class TransferControlView @JvmOverloads constructor(context: Context, attrs: Att updateState { it.copy(isClickable = clickable) } } + private fun List.attachmentIdsAwaitingBackfill(): Set { + return this + .mapNotNullTo(HashSet()) { (it.asAttachment() as? DatabaseAttachment)?.attachmentId } + .apply { retainAll(AttachmentBackfill.awaiting.value) } + } + + /** Tells [AttachmentBackfill] to stop awaiting any backfilled attachment that this view now sees as DONE. */ + private fun clearResolvedBackfills(slides: List) { + for (slide in slides) { + val attachment = slide.asAttachment() as? DatabaseAttachment ?: continue + if (attachment.transferState == AttachmentTable.TRANSFER_PROGRESS_DONE && attachment.attachmentId in AttachmentBackfill.awaiting.value) { + AttachmentBackfill.onAttachmentTerminal(attachment.attachmentId, attachment.mmsId) + } + } + } + private fun MutableMap.applyProgress(attachment: Attachment, update: Progress) { if (update.completed < 0.bytes) { remove(attachment) diff --git a/app/src/main/java/org/thoughtcrime/securesms/components/transfercontrols/TransferControlViewState.kt b/app/src/main/java/org/thoughtcrime/securesms/components/transfercontrols/TransferControlViewState.kt index c4310c26b4..6cb7c6b719 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/components/transfercontrols/TransferControlViewState.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/components/transfercontrols/TransferControlViewState.kt @@ -21,6 +21,5 @@ data class TransferControlViewState( val networkProgress: Map = HashMap(), val compressionProgress: Map = HashMap(), val playableWhileDownloading: Boolean = false, - val isUpload: Boolean = false, - val awaitingPrimaryResponse: Boolean = false + val isUpload: Boolean = false ) diff --git a/app/src/main/java/org/thoughtcrime/securesms/components/transfercontrols/TransferControls.kt b/app/src/main/java/org/thoughtcrime/securesms/components/transfercontrols/TransferControls.kt index 9d3e5818f2..2491bc8225 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/components/transfercontrols/TransferControls.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/components/transfercontrols/TransferControls.kt @@ -5,9 +5,11 @@ package org.thoughtcrime.securesms.components.transfercontrols +import org.signal.core.models.database.AttachmentId import org.signal.core.util.ByteSize import org.signal.core.util.bytes import org.thoughtcrime.securesms.attachments.Attachment +import org.thoughtcrime.securesms.attachments.DatabaseAttachment import org.thoughtcrime.securesms.database.AttachmentTable import org.thoughtcrime.securesms.mms.Slide import org.thoughtcrime.securesms.util.MediaUtil @@ -42,7 +44,10 @@ object TransferControls { data class Bytes(val completed: ByteSize, val total: ByteSize) : ProgressLabel } - fun deriveRenderState(state: TransferControlViewState): TransferControlsRenderState { + fun deriveRenderState( + state: TransferControlViewState, + awaitingAttachmentIds: Set = emptySet() + ): TransferControlsRenderState { if (state.slides.isEmpty()) { return TransferControlsRenderState.Gone } @@ -55,14 +60,17 @@ object TransferControls { return TransferControlsRenderState.Gone } - if (state.awaitingPrimaryResponse) { + // If any attachments are being backfilled, overwrite with in progress state to maintain spinner + val awaitingBackfill = state.slides.any { (it.asAttachment() as? DatabaseAttachment)?.attachmentId in awaitingAttachmentIds } + if (awaitingBackfill) { + val downloading = state.slides.any { it.transferState == AttachmentTable.TRANSFER_PROGRESS_STARTED } return TransferControlsRenderState.InProgress( isUpload = false, placement = if (state.slides.size == 1) Placement.CENTER else Placement.CORNER, - progress = null, + progress = if (downloading) calculateProgress(state) else null, showPlayButton = false, - cancelable = false, - label = null + cancelable = downloading, + label = if (downloading) progressLabel(state) else null ) } @@ -173,10 +181,6 @@ object TransferControls { return weightedProgress / weightedTotal } - /** - * Internal, view-free mirror of the legacy state machine. Kept verbatim from the original view to preserve behavior; the - * resulting [Mode] is mapped to a [TransferControlsRenderState] by [deriveRenderState]. - */ private fun deriveMode(state: TransferControlViewState): Mode { if (state.slides.isEmpty()) { return Mode.GONE @@ -279,9 +283,6 @@ object TransferControls { private const val UPLOAD_TASK_WEIGHT = 1 - /** - * A weighting compared to [UPLOAD_TASK_WEIGHT] - */ private const val COMPRESSION_TASK_WEIGHT = 3 private enum class Mode { diff --git a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationFragment.kt b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationFragment.kt index 78d3e59609..c1911597cf 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationFragment.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/conversation/v2/ConversationFragment.kt @@ -281,6 +281,7 @@ import org.thoughtcrime.securesms.groups.ui.migration.GroupsV1MigrationInfoBotto import org.thoughtcrime.securesms.groups.ui.migration.GroupsV1MigrationSuggestionsDialog import org.thoughtcrime.securesms.groups.v2.GroupBlockJoinRequestResult import org.thoughtcrime.securesms.invites.InviteActions +import org.thoughtcrime.securesms.jobs.AttachmentBackfill import org.thoughtcrime.securesms.jobs.ServiceOutageDetectionJob import org.thoughtcrime.securesms.keyboard.KeyboardPage import org.thoughtcrime.securesms.keyboard.KeyboardPagerFragment @@ -1408,6 +1409,16 @@ class ConversationFragment : } } + lifecycleScope.launch { + lifecycle.repeatOnLifecycle(Lifecycle.State.RESUMED) { + AttachmentBackfill.failures.collect { failure -> + if (failure.threadId == args.threadId) { + showAttachmentBackfillFailureDialog(failure.reason) + } + } + } + } + if (TextSecurePreferences.getServiceOutage(context)) { AppDependencies.jobManager.add(ServiceOutageDetectionJob()) } @@ -3104,6 +3115,19 @@ class ConversationFragment : dialogBuilder.show() } + private fun showAttachmentBackfillFailureDialog(reason: AttachmentBackfill.FailureReason) { + val messageRes = when (reason) { + AttachmentBackfill.FailureReason.TIMEOUT -> R.string.ConversationFragment_attachment_backfill_timeout + AttachmentBackfill.FailureReason.NOT_FOUND -> R.string.ConversationFragment_attachment_backfill_not_found + } + + MaterialAlertDialogBuilder(requireContext()) + .setTitle(R.string.ConversationFragment_attachment_backfill_failed_title) + .setMessage(messageRes) + .setPositiveButton(android.R.string.ok, null) + .show() + } + private fun handleDisplayDetails(conversationMessage: ConversationMessage) { val recipientSnapshot = viewModel.recipientSnapshot ?: return chatRouter.goToChatDetail(MainNavigationDetailLocation.Chats.MessageDetails(recipientSnapshot.id, MessageId(conversationMessage.messageRecord.id))) diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt b/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt index 1bc176d0e2..250f553f60 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/AttachmentTable.kt @@ -1723,6 +1723,50 @@ class AttachmentTable( } } + /** + * Replaces the remote pointer fields on [attachmentId] with the values from a primary device's + * backfill response and resets the transfer state to pending so a fresh download can run. + */ + fun updatePointerFromBackfill(attachmentId: AttachmentId, attachment: Attachment): Boolean { + val remoteLocation = attachment.remoteLocation + if (remoteLocation.isNullOrBlank()) { + Log.w(TAG, "[updatePointerFromBackfill] Attachment has no remote location for $attachmentId.") + return false + } + + val remoteKey = attachment.remoteKey + if (remoteKey.isNullOrBlank()) { + Log.w(TAG, "[updatePointerFromBackfill] Attachment missing key for $attachmentId.") + return false + } + + val remoteDigest = attachment.remoteDigest + if (remoteDigest == null) { + Log.w(TAG, "[updatePointerFromBackfill] Attachment missing digest for $attachmentId.") + return false + } + + val values = contentValuesOf( + TRANSFER_STATE to TRANSFER_PROGRESS_PENDING, + CDN_NUMBER to attachment.cdn.cdnNumber, + REMOTE_LOCATION to remoteLocation, + REMOTE_KEY to remoteKey, + REMOTE_DIGEST to remoteDigest, + REMOTE_INCREMENTAL_DIGEST to attachment.incrementalDigest, + REMOTE_INCREMENTAL_DIGEST_CHUNK_SIZE to attachment.incrementalMacChunkSize, + DATA_SIZE to attachment.size, + UPLOAD_TIMESTAMP to attachment.uploadTimestamp + ) + + val updateCount = writableDatabase + .update(TABLE_NAME) + .values(values) + .where("$ID = ?", attachmentId.id) + .run() + + return updateCount > 0 + } + /** * Updates the attachment (and all attachments that share the same data file) with a new length. */ diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentBackfill.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentBackfill.kt new file mode 100644 index 0000000000..dc1f4b9380 --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentBackfill.kt @@ -0,0 +1,289 @@ +/* + * Copyright 2026 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.thoughtcrime.securesms.jobs + +import androidx.annotation.VisibleForTesting +import io.reactivex.rxjava3.core.Observable +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.asSharedFlow +import kotlinx.coroutines.flow.asStateFlow +import kotlinx.coroutines.flow.update +import kotlinx.coroutines.launch +import kotlinx.coroutines.rx3.asObservable +import org.signal.core.models.database.AttachmentId +import org.signal.core.util.logging.Log +import org.thoughtcrime.securesms.attachments.DatabaseAttachment +import org.thoughtcrime.securesms.database.SignalDatabase +import org.thoughtcrime.securesms.database.model.MessageRecord +import org.thoughtcrime.securesms.database.model.MmsMessageRecord +import org.thoughtcrime.securesms.dependencies.AppDependencies +import org.thoughtcrime.securesms.keyvalue.SignalStore +import org.thoughtcrime.securesms.util.MediaUtil +import java.util.concurrent.ConcurrentHashMap +import kotlin.time.Duration +import kotlin.time.Duration.Companion.minutes +import kotlin.time.Duration.Companion.seconds + +/** + * Orchestrates outbound attachment backfill requests and the awaiting-state that drives the UI controls. + * + * [awaiting] is keyed by [AttachmentId] and combined with the slides in [TransferControlView]. The flag is set in + * [maybeRequest] and cleared in [onAttachmentTerminal] when the backfill download terminates in failure or is rendered + * in success. + * + * Dedup, timeout, threadId, and the failure dialog are per-message. + */ +object AttachmentBackfill { + + private val TAG = Log.tag(AttachmentBackfill::class) + + /** Suppress duplicate requests for the same message; bounded so a lost request can be re-driven by a later retry. */ + private val DEDUP_REQUEST_BACKFILL_WINDOW = 30.seconds.inWholeMilliseconds + + /** Time to wait for an initial response from the primary before showing a check network error */ + private val INITIAL_RESPONSE_TIMEOUT = 30.seconds + + /** Hard stop to wait on a primary acknowledged pending attachment before showing an error */ + private val PENDING_RESPONSE_TIMEOUT = 2.minutes + + private val messageBackfillLastRequestedAt: ConcurrentHashMap = ConcurrentHashMap() + + private val messageThreads: ConcurrentHashMap = ConcurrentHashMap() + + private val messageTimeoutJobs: ConcurrentHashMap = ConcurrentHashMap() + + private val messageAttachments: ConcurrentHashMap> = ConcurrentHashMap() + + private val _awaiting = MutableStateFlow>(emptySet()) + + /** Attachments currently awaiting a primary backfill (request sent, not yet re-downloaded) */ + val awaiting: StateFlow> = _awaiting.asStateFlow() + + private val _failures = MutableSharedFlow(extraBufferCapacity = 16) + + /** One-shot stream of backfill failures for a foreground surface to react to (e.g. show a dialog). */ + val failures: SharedFlow = _failures.asSharedFlow() + + /** The scope the response timeouts run on. Overridable so tests can drive the timeout with virtual time. */ + @VisibleForTesting + internal var timeoutScope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob()) + + /** + * Considers a backfill request for [messageId] after a permanent download failure. The attachment is flagged + * [awaiting] even when the request send is deduped, so each failed part of a multi-attachment message marks itself. + */ + @JvmStatic + fun maybeRequest(messageId: Long, attachment: DatabaseAttachment) { + if (SignalStore.account.isPrimaryDevice) { + return + } + + val record = SignalDatabase.messages.getMessageRecordOrNull(messageId) + if (record == null) { + Log.d(TAG, "[$messageId] Skipping backfill: message missing.") + return + } + + if (!isEligibleForBackfill(record, attachment)) { + Log.d(TAG, "[$messageId] Skipping backfill request for ineligible attachment kind.") + return + } + + markAwaiting(messageId, attachment.attachmentId) + + val now = System.currentTimeMillis() + val previous = messageBackfillLastRequestedAt[messageId] + if (previous != null && now - previous < DEDUP_REQUEST_BACKFILL_WINDOW) { + Log.d(TAG, "[$messageId] Request already sent ${now - previous} ms ago; attachment flagged, not re-requesting.") + return + } + messageBackfillLastRequestedAt[messageId] = now + + Log.i(TAG, "[$messageId] Enqueuing attachment backfill request.") + AppDependencies.jobManager.add(MultiDeviceAttachmentBackfillRequestJob(messageId)) + + startTimeout(messageId, record.threadId, INITIAL_RESPONSE_TIMEOUT) + } + + /** + * Manages only the response timeout: extend it while the primary still reports work pending, else cancel it. + */ + @JvmStatic + fun onResponseProcessed(messageId: Long, anyPending: Boolean) { + if (anyPending) { + val threadId = messageThreads[messageId] + if (threadId == null) { + Log.d(TAG, "[$messageId] Pending response for a message we're no longer tracking; ignoring.") + return + } + + Log.i(TAG, "[$messageId] Backfill response still pending; extending timeout.") + startTimeout(messageId, threadId, PENDING_RESPONSE_TIMEOUT) + } else { + Log.i(TAG, "[$messageId] Backfill response final; awaiting re-download(s).") + cancelTimeout(messageId) + } + } + + /** + * Stop awaiting an attachment whose backfill reached a terminal error state or attachment has been rendered. + * Only drops the message's bookkeeping once none of its attachments are awaiting. + */ + @JvmStatic + fun onAttachmentTerminal(attachmentId: AttachmentId, messageId: Long) { + if (!_awaiting.value.contains(attachmentId)) { + return + } + + Log.i(TAG, "[$messageId] Backfill resolved for $attachmentId.") + _awaiting.update { it - attachmentId } + + val remaining = messageAttachments.computeIfPresent(messageId) { _, set -> + (set - attachmentId).ifEmpty { null } + } + + if (remaining == null) { + clearMessage(messageId) + } + } + + /** Primary could not find the message; stop awaiting all of its attachments and surface a not-found failure. */ + @JvmStatic + fun onResponseMessageNotFound(messageId: Long, threadId: Long) { + Log.w(TAG, "[$messageId] Backfill response reported message not found.") + clearMessageAndAwaiting(messageId) + _failures.tryEmit(BackfillFailure(messageId, threadId, FailureReason.NOT_FOUND)) + } + + @JvmStatic + fun clearPending(messageId: Long) { + clearMessageAndAwaiting(messageId) + } + + private fun markAwaiting(messageId: Long, attachmentId: AttachmentId) { + messageAttachments.merge(messageId, setOf(attachmentId)) { old, added -> old + added } + _awaiting.update { it + attachmentId } + } + + private fun startTimeout(messageId: Long, threadId: Long, timeout: Duration) { + messageThreads[messageId] = threadId + + messageTimeoutJobs.remove(messageId)?.cancel() + + val timeoutJob = timeoutScope.launch { + delay(timeout) + + // Conditional remove: fails if a response replaced or cleared our entry, so we never fire a stale timeout. + if (messageTimeoutJobs.remove(messageId, coroutineContext[Job])) { + Log.w(TAG, "[$messageId] Backfill request timed out after $timeout.") + clearMessageAndAwaiting(messageId) + _failures.tryEmit(BackfillFailure(messageId, threadId, FailureReason.TIMEOUT)) + } + } + messageTimeoutJobs[messageId] = timeoutJob + } + + /** Cancels the pending timeout for [messageId] without otherwise touching its wait state. */ + private fun cancelTimeout(messageId: Long) { + messageTimeoutJobs.remove(messageId)?.cancel() + } + + /** Drops all per-message bookkeeping. Does not touch [_awaiting], see [clearMessageAndAwaiting]. */ + private fun clearMessage(messageId: Long) { + messageBackfillLastRequestedAt.remove(messageId) + messageThreads.remove(messageId) + messageAttachments.remove(messageId) + messageTimeoutJobs.remove(messageId)?.cancel() + } + + private fun clearMessageAndAwaiting(messageId: Long) { + val ids = messageAttachments[messageId] + if (!ids.isNullOrEmpty()) { + _awaiting.update { it - ids } + } + clearMessage(messageId) + } + + /** + * The eligible attachments of a message, display-ordered. The single source of truth for the protocol set, so the + * responder (which to re-upload) and requester (how to match the response) agree. See [backfillContractForMessage]. + */ + @JvmStatic + fun backfillAttachmentsForMessage(messageId: Long): List { + val record = SignalDatabase.messages.getMessageRecordOrNull(messageId) ?: return emptyList() + return SignalDatabase.attachments + .getAttachmentsForMessage(messageId) + .filter { isEligibleForBackfill(record, it) } + .sortedBy { it.displayOrder } + } + + /** + * Splits the full on-diskl set to mirror the wire format, splitting eligible normal attachments from long text. + */ + @JvmStatic + fun backfillContractForMessage(messageId: Long): BackfillContract { + val all = backfillAttachmentsForMessage(messageId) + return BackfillContract( + bodyAttachments = all.filterNot { it.contentType == MediaUtil.LONG_TEXT }, + longTextAttachment = all.firstOrNull { it.contentType == MediaUtil.LONG_TEXT } + ) + } + + @JvmStatic + fun isEligibleForBackfill(record: MessageRecord, attachment: DatabaseAttachment): Boolean { + if (record is MmsMessageRecord && record.storyType.isStory) { + return false + } + + if (attachment.quote) { + return false + } + + if (attachment.isSticker || MediaUtil.isLongTextType(attachment.contentType)) { + return true + } + + if (record is MmsMessageRecord) { + val ineligibleIds = buildSet { + record.linkPreviews.mapNotNullTo(this) { it.attachmentId } + record.sharedContacts.mapNotNullTo(this) { it.avatar?.attachmentId } + } + + if (attachment.attachmentId in ineligibleIds) { + return false + } + } + + return true + } + + @JvmStatic + fun awaitingChanges(): Observable> = awaiting.asObservable() + + @JvmStatic + fun isAwaitingBackfill(attachmentId: AttachmentId): Boolean = _awaiting.value.contains(attachmentId) + + enum class FailureReason { + /** The primary did not respond before the timeout. */ + TIMEOUT, + + /** The primary responded but could not find the message to re-upload. */ + NOT_FOUND + } + + data class BackfillFailure(val messageId: Long, val threadId: Long, val reason: FailureReason) + + /** Body attachments (the positional `attachments` array) and the optional long-text slot of the wire response. */ + data class BackfillContract(val bodyAttachments: List, val longTextAttachment: DatabaseAttachment?) +} diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentDownloadJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentDownloadJob.kt index 479356a1fa..357af8d2c5 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentDownloadJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/AttachmentDownloadJob.kt @@ -62,7 +62,8 @@ class AttachmentDownloadJob private constructor( parameters: Parameters, private val messageId: Long, private val attachmentId: AttachmentId, - private val forceDownload: Boolean + private val forceDownload: Boolean, + private val requestSource: RequestSource ) : BaseJob(parameters) { companion object { @@ -72,6 +73,7 @@ class AttachmentDownloadJob private constructor( private const val KEY_MESSAGE_ID = "message_id" private const val KEY_ATTACHMENT_ID = "part_row_id" private const val KEY_FORCE_DOWNLOAD = "part_manual" + private const val KEY_SOURCE = "requestSource" @JvmStatic fun constructQueueString(attachmentId: AttachmentId): String { @@ -113,7 +115,8 @@ class AttachmentDownloadJob private constructor( val downloadJob = AttachmentDownloadJob( messageId = databaseAttachment.mmsId, attachmentId = databaseAttachment.attachmentId, - forceDownload = true + forceDownload = true, + requestSource = RequestSource.USER ) AppDependencies.jobManager.add(downloadJob) downloadJob.id @@ -144,9 +147,23 @@ class AttachmentDownloadJob private constructor( } } - constructor(messageId: Long, attachmentId: AttachmentId, forceDownload: Boolean) : this(messageId, attachmentId, forceDownload, forceDownload, forceDownload) + @JvmOverloads + constructor( + messageId: Long, + attachmentId: AttachmentId, + forceDownload: Boolean, + requestSource: RequestSource = RequestSource.AUTO + ) : this(messageId, attachmentId, forceDownload, forceDownload, forceDownload, requestSource) - constructor(messageId: Long, attachmentId: AttachmentId, forceDownload: Boolean, skipInCallConstraint: Boolean, isHighPriority: Boolean) : this( + @JvmOverloads + constructor( + messageId: Long, + attachmentId: AttachmentId, + forceDownload: Boolean, + skipInCallConstraint: Boolean, + isHighPriority: Boolean, + requestSource: RequestSource = RequestSource.AUTO + ) : this( Parameters.Builder() .setQueue(constructQueueString(attachmentId)) .addConstraint(NetworkConstraint.KEY) @@ -157,7 +174,8 @@ class AttachmentDownloadJob private constructor( .build(), messageId, attachmentId, - forceDownload + forceDownload, + requestSource ) override fun serialize(): ByteArray? { @@ -165,6 +183,7 @@ class AttachmentDownloadJob private constructor( .putLong(KEY_MESSAGE_ID, messageId) .putLong(KEY_ATTACHMENT_ID, attachmentId.id) .putBoolean(KEY_FORCE_DOWNLOAD, forceDownload) + .putString(KEY_SOURCE, requestSource.name) .serialize() } @@ -482,10 +501,48 @@ class AttachmentDownloadJob private constructor( private fun markFailed(messageId: Long, attachmentId: AttachmentId) { SignalDatabase.attachments.setTransferProgressFailed(attachmentId, messageId) + + if (requestSource == RequestSource.BACKFILL) { + AttachmentBackfill.onAttachmentTerminal(attachmentId, messageId) + } else { + maybeRequestBackfill() + } } private fun markPermanentlyFailed(messageId: Long, attachmentId: AttachmentId) { SignalDatabase.attachments.setTransferProgressPermanentFailure(attachmentId, messageId) + + if (requestSource == RequestSource.BACKFILL) { + AttachmentBackfill.onAttachmentTerminal(attachmentId, messageId) + } else { + maybeRequestBackfill() + } + } + + private fun maybeRequestBackfill() { + if (SignalStore.account.isPrimaryDevice || requestSource != RequestSource.USER) { + return + } + + val attachment = SignalDatabase.attachments.getAttachment(attachmentId) ?: return + + if (SignalStore.backup.backsUpMedia && attachment.dataHash != null && attachment.transferState == AttachmentTable.TRANSFER_PROGRESS_FAILED) { + Log.i(TAG, "[$attachmentId] Backs up media and have a plaintext hash; attempting archive restore before backfill") + RestoreAttachmentJob.forManualRestore(attachment) + return + } + + AttachmentBackfill.maybeRequest(messageId, attachment) + } + + enum class RequestSource { + AUTO, + USER, + BACKFILL; + + companion object { + fun fromName(name: String?): RequestSource = entries.firstOrNull { it.name == name } ?: AUTO + } } class Factory : Job.Factory { @@ -495,7 +552,8 @@ class AttachmentDownloadJob private constructor( parameters = parameters, messageId = data.getLong(KEY_MESSAGE_ID), attachmentId = AttachmentId(data.getLong(KEY_ATTACHMENT_ID)), - forceDownload = data.getBoolean(KEY_FORCE_DOWNLOAD) + forceDownload = data.getBoolean(KEY_FORCE_DOWNLOAD), + requestSource = RequestSource.fromName(data.getStringOrDefault(KEY_SOURCE, null)) ) } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java index 6980530b8f..02da6d7ddd 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java @@ -210,6 +210,7 @@ public final class JobManagerFactories { put(MarkerJob.KEY, new MarkerJob.Factory()); put(MessageSendLogCleanupJob.KEY, new MessageSendLogCleanupJob.Factory()); put(MultiDeviceAttachmentBackfillMissingJob.KEY, new MultiDeviceAttachmentBackfillMissingJob.Factory()); + put(MultiDeviceAttachmentBackfillRequestJob.KEY, new MultiDeviceAttachmentBackfillRequestJob.Factory()); put(MultiDeviceAttachmentBackfillUpdateJob.KEY, new MultiDeviceAttachmentBackfillUpdateJob.Factory()); put(MultiDeviceBlockedUpdateJob.KEY, new MultiDeviceBlockedUpdateJob.Factory()); put(MultiDeviceCallLinkSyncJob.KEY, new MultiDeviceCallLinkSyncJob.Factory()); diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/MultiDeviceAttachmentBackfillRequestJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/MultiDeviceAttachmentBackfillRequestJob.kt new file mode 100644 index 0000000000..b8088e51b6 --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/MultiDeviceAttachmentBackfillRequestJob.kt @@ -0,0 +1,139 @@ +/* + * Copyright 2026 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.thoughtcrime.securesms.jobs + +import okio.ByteString.Companion.toByteString +import org.signal.core.util.logging.Log +import org.signal.core.util.orNull +import org.thoughtcrime.securesms.database.SignalDatabase +import org.thoughtcrime.securesms.dependencies.AppDependencies +import org.thoughtcrime.securesms.jobmanager.Job +import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint +import org.thoughtcrime.securesms.jobmanager.impl.SealedSenderConstraint +import org.thoughtcrime.securesms.jobs.protos.MultiDeviceAttachmentBackfillRequestJobData +import org.thoughtcrime.securesms.keyvalue.SignalStore +import org.thoughtcrime.securesms.recipients.Recipient +import org.whispersystems.signalservice.api.crypto.UntrustedIdentityException +import org.whispersystems.signalservice.api.messages.multidevice.SignalServiceSyncMessage +import org.whispersystems.signalservice.api.push.exceptions.ServerRejectedException +import org.whispersystems.signalservice.internal.push.AddressableMessage +import org.whispersystems.signalservice.internal.push.ConversationIdentifier +import org.whispersystems.signalservice.internal.push.SyncMessage +import java.io.IOException +import kotlin.time.Duration.Companion.seconds + +/** + * Sent by a linked device to request the primary to re-upload attachments when we fail + * to restore them automatically but the user has specifically requested we retry. + */ +class MultiDeviceAttachmentBackfillRequestJob private constructor( + parameters: Parameters, + private val messageId: Long +) : Job(parameters) { + + companion object { + private val TAG = Log.tag(MultiDeviceAttachmentBackfillRequestJob::class) + + const val KEY = "MultiDeviceAttachmentBackfillRequestJob" + } + + constructor(messageId: Long) : this( + Parameters.Builder() + .setLifespan(30.seconds.inWholeMilliseconds) + .setMaxAttempts(3) + .addConstraint(NetworkConstraint.KEY) + .addConstraint(SealedSenderConstraint.KEY) + .build(), + messageId + ) + + override fun getFactoryKey(): String = KEY + + override fun serialize(): ByteArray { + return MultiDeviceAttachmentBackfillRequestJobData(messageId = messageId).encode() + } + + override fun run(): Result { + if (SignalStore.account.isPrimaryDevice) { + Log.w(TAG, "Primary device should never request attachment backfill. Dropping.") + return Result.failure() + } + + val record = SignalDatabase.messages.getMessageRecordOrNull(messageId) + if (record == null) { + Log.w(TAG, "[$messageId] No message record; cannot build backfill request.") + return Result.failure() + } + + val author = record.fromRecipient + val authorServiceId = author.aci.orNull()?.toByteString() + if (authorServiceId == null) { + Log.w(TAG, "[$messageId] No serviceId for author ${author.id}; cannot build backfill request.") + return Result.failure() + } + + val threadRecipient = SignalDatabase.threads.getRecipientForThreadId(record.threadId) + if (threadRecipient == null) { + Log.w(TAG, "[$messageId] No thread recipient for thread ${record.threadId}; cannot build backfill request.") + return Result.failure() + } + + val conversation = threadRecipient.toBackfillConversationId() + if (conversation == null) { + Log.w(TAG, "[$messageId] No conversation identifier for recipient ${threadRecipient.id}; cannot build backfill request.") + return Result.failure() + } + + val request = SyncMessage.AttachmentBackfillRequest( + targetMessage = AddressableMessage( + authorServiceIdBinary = authorServiceId, + sentTimestamp = record.dateSent + ), + targetConversation = conversation + ) + + val syncMessage = SignalServiceSyncMessage.forAttachmentBackfillRequest(request) + + return try { + val result = AppDependencies.signalServiceMessageSender.sendSyncMessage(syncMessage) + if (result.isSuccess) { + Log.i(TAG, "[${record.dateSent}] Sent attachment backfill request.") + Result.success() + } else { + Log.w(TAG, "[${record.dateSent}] Non-success send result; retrying.") + Result.retry(defaultBackoff()) + } + } catch (e: ServerRejectedException) { + Log.w(TAG, e) + Result.failure() + } catch (e: IOException) { + Log.w(TAG, e) + Result.retry(defaultBackoff()) + } catch (e: UntrustedIdentityException) { + Log.w(TAG, e) + Result.failure() + } + } + + override fun onFailure() = Unit + + private fun Recipient.toBackfillConversationId(): ConversationIdentifier? { + return when { + this.isGroup -> ConversationIdentifier(threadGroupId = this.requireGroupId().decodedId.toByteString()) + this.hasAci -> ConversationIdentifier(threadServiceIdBinary = this.requireAci().toByteString()) + this.hasPni -> ConversationIdentifier(threadServiceIdBinary = this.requirePni().toByteString()) + this.hasE164 -> ConversationIdentifier(threadE164 = this.requireE164()) + else -> null + } + } + + class Factory : Job.Factory { + override fun create(parameters: Parameters, serializedData: ByteArray?): MultiDeviceAttachmentBackfillRequestJob { + val data = MultiDeviceAttachmentBackfillRequestJobData.ADAPTER.decode(serializedData!!) + return MultiDeviceAttachmentBackfillRequestJob(parameters, data.messageId) + } + } +} diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/MultiDeviceAttachmentBackfillUpdateJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/MultiDeviceAttachmentBackfillUpdateJob.kt index 1e7b3a2b94..3761e1a869 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/MultiDeviceAttachmentBackfillUpdateJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/MultiDeviceAttachmentBackfillUpdateJob.kt @@ -8,13 +8,11 @@ package org.thoughtcrime.securesms.jobs import org.signal.core.util.logging.Log import org.thoughtcrime.securesms.attachments.DatabaseAttachment import org.thoughtcrime.securesms.attachments.toAttachmentPointer -import org.thoughtcrime.securesms.database.SignalDatabase import org.thoughtcrime.securesms.dependencies.AppDependencies import org.thoughtcrime.securesms.jobmanager.Job import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint import org.thoughtcrime.securesms.jobmanager.impl.SealedSenderConstraint import org.thoughtcrime.securesms.jobs.protos.MultiDeviceAttachmentBackfillUpdateJobData -import org.thoughtcrime.securesms.util.MediaUtil import org.whispersystems.signalservice.api.crypto.UntrustedIdentityException import org.whispersystems.signalservice.api.messages.multidevice.SignalServiceSyncMessage import org.whispersystems.signalservice.api.push.exceptions.ServerRejectedException @@ -75,11 +73,9 @@ class MultiDeviceAttachmentBackfillUpdateJob( } override fun run(): Result { - val allAttachments = SignalDatabase.attachments.getAttachmentsForMessage(messageId) - val syncAttachments: List = allAttachments - .filterNot { it.quote || it.contentType == MediaUtil.LONG_TEXT } - .sortedBy { it.displayOrder } - val longTextAttachment: DatabaseAttachment? = allAttachments.firstOrNull { it.contentType == MediaUtil.LONG_TEXT } + val contract = AttachmentBackfill.backfillContractForMessage(messageId) + val syncAttachments: List = contract.bodyAttachments + val longTextAttachment: DatabaseAttachment? = contract.longTextAttachment if (syncAttachments.isEmpty() && longTextAttachment == null) { Log.w(TAG, "Failed to find any attachments for the message! Sending a missing response.") diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/RestoreAttachmentJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/RestoreAttachmentJob.kt index f923974d5d..306abad227 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/RestoreAttachmentJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/RestoreAttachmentJob.kt @@ -277,14 +277,14 @@ class RestoreAttachmentJob private constructor( SignalLocalMetrics.ArchiveAttachmentRestore.start(attachmentId) - val progressServiceController = BackupMediaRestoreService.start(context, context.getString(R.string.BackupStatus__restoring_media)) + val progressServiceController = if (!manual) BackupMediaRestoreService.start(context, context.getString(R.string.BackupStatus__restoring_media)) else null if (progressServiceController != null) { progressServiceController.use { retrieveAttachment(messageId, attachmentId, attachment) } } else { - Log.w(TAG, "Continuing without service.") + Log.w(TAG, "Continuing without service. manual: $manual") retrieveAttachment(messageId, attachmentId, attachment) } @@ -506,12 +506,22 @@ class RestoreAttachmentJob private constructor( ArchiveDatabaseExecutor.runBlocking { SignalDatabase.attachments.setRestoreTransferState(attachmentId, AttachmentTable.TRANSFER_PROGRESS_FAILED) } + maybeRequestBackfill() } private fun markPermanentlyFailed(attachmentId: AttachmentId) { ArchiveDatabaseExecutor.runBlocking { SignalDatabase.attachments.setRestoreTransferState(attachmentId, AttachmentTable.TRANSFER_PROGRESS_PERMANENT_FAILURE) } + maybeRequestBackfill() + } + + private fun maybeRequestBackfill() { + if (SignalStore.account.isPrimaryDevice || !manual) { + return + } + val attachment = SignalDatabase.attachments.getAttachment(attachmentId) ?: return + AttachmentBackfill.maybeRequest(messageId, attachment) } private fun maybePostFailedToDownloadFromArchiveNotification() { diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/SyncMessageProcessor.kt b/app/src/main/java/org/thoughtcrime/securesms/messages/SyncMessageProcessor.kt index 7873a49b5b..a443f00d6b 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/messages/SyncMessageProcessor.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/SyncMessageProcessor.kt @@ -64,6 +64,7 @@ import org.thoughtcrime.securesms.dependencies.KeyTransparencyApi import org.thoughtcrime.securesms.groups.BadGroupIdException import org.thoughtcrime.securesms.groups.GroupChangeBusyException import org.thoughtcrime.securesms.groups.GroupId +import org.thoughtcrime.securesms.jobs.AttachmentBackfill import org.thoughtcrime.securesms.jobs.AttachmentDownloadJob import org.thoughtcrime.securesms.jobs.AttachmentUploadJob import org.thoughtcrime.securesms.jobs.MultiDeviceAttachmentBackfillMissingJob @@ -135,6 +136,7 @@ import org.whispersystems.signalservice.internal.push.EditMessage import org.whispersystems.signalservice.internal.push.Envelope import org.whispersystems.signalservice.internal.push.StoryMessage import org.whispersystems.signalservice.internal.push.SyncMessage +import org.whispersystems.signalservice.internal.push.SyncMessage.AttachmentBackfillResponse import org.whispersystems.signalservice.internal.push.SyncMessage.Blocked import org.whispersystems.signalservice.internal.push.SyncMessage.CallLinkUpdate import org.whispersystems.signalservice.internal.push.SyncMessage.CallLogEvent @@ -190,7 +192,7 @@ object SyncMessageProcessor { syncMessage.callLogEvent != null -> handleSynchronizeCallLogEvent(syncMessage.callLogEvent!!, envelope.clientTimestamp!!) syncMessage.deleteForMe != null -> handleSynchronizeDeleteForMe(context, syncMessage.deleteForMe!!, envelope.clientTimestamp!!, earlyMessageCacheEntry) syncMessage.attachmentBackfillRequest != null -> handleSynchronizeAttachmentBackfillRequest(syncMessage.attachmentBackfillRequest!!, envelope.clientTimestamp!!) - syncMessage.attachmentBackfillResponse != null -> warn(envelope.clientTimestamp!!, "Contains a backfill response, but we don't handle these!") + syncMessage.attachmentBackfillResponse != null -> handleSynchronizeAttachmentBackfillResponse(syncMessage.attachmentBackfillResponse!!, envelope.clientTimestamp!!) syncMessage.usernameChange != null -> handleSynchronizeUsernameChange(envelope.clientTimestamp!!) else -> warn(envelope.clientTimestamp!!, "Contains no known sync types...") } @@ -1739,7 +1741,7 @@ object SyncMessageProcessor { return } - val attachments: List = SignalDatabase.attachments.getAttachmentsForMessage(messageId).filterNot { it.quote }.sortedBy { it.displayOrder } + val attachments: List = AttachmentBackfill.backfillAttachmentsForMessage(messageId) if (attachments.isEmpty()) { warn(timestamp, "[AttachmentBackfillRequest] There were no attachments found for the message! Enqueuing a 'missing' response.") MultiDeviceAttachmentBackfillMissingJob.enqueue(request.targetMessage!!, request.targetConversation!!) @@ -1747,7 +1749,7 @@ object SyncMessageProcessor { } val now = System.currentTimeMillis() - val needsUpload = attachments.filter { now - it.uploadTimestamp > 3.days.inWholeMilliseconds } + val needsUpload = attachments.filter { it.hasData && now - it.uploadTimestamp > 3.days.inWholeMilliseconds } log(timestamp, "[AttachmentBackfillRequest] ${needsUpload.size}/${attachments.size} attachments need to be re-uploaded.") for (attachment in needsUpload) { @@ -1761,6 +1763,99 @@ object SyncMessageProcessor { MultiDeviceAttachmentBackfillUpdateJob.enqueue(request.targetMessage!!, request.targetConversation!!, messageId) } + private fun handleSynchronizeAttachmentBackfillResponse(response: AttachmentBackfillResponse, timestamp: Long) { + if (SignalStore.account.isPrimaryDevice) { + log(timestamp, "[AttachmentBackfillResponse] Primary device ignores attachment backfill response.") + return + } + + if (response.targetMessage == null || response.targetConversation == null) { + warn(timestamp, "[AttachmentBackfillResponse] Missing targetMessage or targetConversation; dropping.") + return + } + + val syncMessageId = response.targetMessage!!.toSyncMessageId(timestamp) + if (syncMessageId == null) { + warn(timestamp, "[AttachmentBackfillResponse] Invalid targetMessage; dropping.") + return + } + + val conversationRecipientId = response.targetConversation!!.toRecipientId() + if (conversationRecipientId == null) { + warn(timestamp, "[AttachmentBackfillResponse] Unable to resolve targetConversation; dropping.") + return + } + + val threadId = SignalDatabase.threads.getThreadIdFor(conversationRecipientId) + if (threadId == null) { + warn(timestamp, "[AttachmentBackfillResponse] No thread for conversation; dropping.") + return + } + + val messageId = SignalDatabase.messages.getMessageIdOrNull(syncMessageId, threadId) + if (messageId == null) { + warn(timestamp, "[AttachmentBackfillResponse] Unable to find local message; dropping.") + return + } + + if (response.error == AttachmentBackfillResponse.Error.MESSAGE_NOT_FOUND) { + log(timestamp, "[AttachmentBackfillResponse] Primary could not find message $messageId; marking attachments failed") + val attachments = AttachmentBackfill.backfillAttachmentsForMessage(messageId) + .filterNot { it.transferState == AttachmentTable.TRANSFER_PROGRESS_DONE } + for (attachment in attachments) { + SignalDatabase.attachments.setTransferProgressFailed(attachment.attachmentId, messageId) + } + AttachmentBackfill.onResponseMessageNotFound(messageId, threadId) + return + } + + val attachmentList = response.attachments + if (attachmentList == null) { + warn(timestamp, "[AttachmentBackfillResponse] Response has neither error nor attachments; dropping.") + return + } + + val contract = AttachmentBackfill.backfillContractForMessage(messageId) + val localBody = contract.bodyAttachments + val localLongText = contract.longTextAttachment + + var anyPending = false + var pointersApplied = 0 + + for ((index, remote) in attachmentList.attachments.withIndex()) { + val local = localBody.getOrNull(index) + if (local == null) { + warn(timestamp, "[AttachmentBackfillResponse] Remote attachment $index has no local row; skipping.") + continue + } + when (applyAttachmentData(local, remote, messageId)) { + // UPDATED/PENDING keep awaiting; the re-download terminates them later. + BackfillApplyResult.PENDING -> anyPending = true + BackfillApplyResult.UPDATED -> pointersApplied++ + // Won't re-download, so stop awaiting now. + BackfillApplyResult.SKIPPED, + BackfillApplyResult.TERMINAL -> AttachmentBackfill.onAttachmentTerminal(local.attachmentId, messageId) + } + } + + if (localLongText != null) { + if (attachmentList.longText != null) { + when (applyAttachmentData(localLongText, attachmentList.longText!!, messageId)) { + BackfillApplyResult.PENDING -> anyPending = true + BackfillApplyResult.UPDATED -> pointersApplied++ + BackfillApplyResult.SKIPPED, + BackfillApplyResult.TERMINAL -> AttachmentBackfill.onAttachmentTerminal(localLongText.attachmentId, messageId) + } + } else { + AttachmentBackfill.onAttachmentTerminal(localLongText.attachmentId, messageId) + } + } + + log(timestamp, "[AttachmentBackfillResponse] Processed response for messageId=$messageId pointersApplied=$pointersApplied anyPending=$anyPending") + + AttachmentBackfill.onResponseProcessed(messageId, anyPending) + } + private fun handleSynchronizePniChangeNumber(envelope: Envelope, metadata: EnvelopeMetadata, pniChangeNumber: SyncMessage.PniChangeNumber) { val timestamp = envelope.clientTimestamp!! @@ -1854,6 +1949,46 @@ object SyncMessageProcessor { AppDependencies.jobManager.add(PreKeysSyncJob.create(forceRotationRequested = true)) } + private fun applyAttachmentData( + local: DatabaseAttachment, + remote: AttachmentBackfillResponse.AttachmentData, + messageId: Long + ): BackfillApplyResult { + if (local.transferState == AttachmentTable.TRANSFER_PROGRESS_DONE) { + return BackfillApplyResult.SKIPPED + } + + if (remote.status == AttachmentBackfillResponse.AttachmentData.Status.PENDING) { + return BackfillApplyResult.PENDING + } + + if (remote.status == AttachmentBackfillResponse.AttachmentData.Status.TERMINAL_ERROR) { + SignalDatabase.attachments.setTransferProgressPermanentFailure(local.attachmentId, messageId) + return BackfillApplyResult.TERMINAL + } + + val attachment = remote.attachment?.toPointer() ?: return BackfillApplyResult.SKIPPED + + val updated = SignalDatabase.attachments.updatePointerFromBackfill(local.attachmentId, attachment) + if (!updated) { + SignalDatabase.attachments.setTransferProgressPermanentFailure(local.attachmentId, messageId) + return BackfillApplyResult.TERMINAL + } + + SignalDatabase.runPostSuccessfulTransaction { + AppDependencies.jobManager.add( + AttachmentDownloadJob( + messageId = messageId, + attachmentId = local.attachmentId, + forceDownload = true, + requestSource = AttachmentDownloadJob.RequestSource.BACKFILL + ) + ) + } + + return BackfillApplyResult.UPDATED + } + private fun handleSynchronizedPollCreate( envelope: Envelope, message: DataMessage, @@ -2090,4 +2225,6 @@ object SyncMessageProcessor { return AttachmentTable.SyncAttachmentId(syncMessageId, uuid, digest, plaintextHash) } } + + private enum class BackfillApplyResult { SKIPPED, PENDING, TERMINAL, UPDATED } } diff --git a/app/src/main/protowire/JobData.proto b/app/src/main/protowire/JobData.proto index de3ab021e2..0129b0f07f 100644 --- a/app/src/main/protowire/JobData.proto +++ b/app/src/main/protowire/JobData.proto @@ -286,3 +286,7 @@ message AdminDeleteJobData { message IndividualSendJobV2Data { uint64 messageId = 1; } + +message MultiDeviceAttachmentBackfillRequestJobData { + uint64 messageId = 1; +} diff --git a/app/src/main/res/values/strings.xml b/app/src/main/res/values/strings.xml index 19e1b768c2..fb23def49d 100644 --- a/app/src/main/res/values/strings.xml +++ b/app/src/main/res/values/strings.xml @@ -721,6 +721,12 @@ Group names Photo failed to download. Try again. + + Can\'t download media + + Check this device and your phone\'s internet connection. Open Signal on your phone, then try downloading again. + + This media is no longer available on your phone and can\'t be downloaded. Keep pinned for… diff --git a/app/src/test/java/org/thoughtcrime/securesms/components/transfercontrols/TransferControlsTest.kt b/app/src/test/java/org/thoughtcrime/securesms/components/transfercontrols/TransferControlsTest.kt index 33b4d8b32c..a3e3fd87a3 100644 --- a/app/src/test/java/org/thoughtcrime/securesms/components/transfercontrols/TransferControlsTest.kt +++ b/app/src/test/java/org/thoughtcrime/securesms/components/transfercontrols/TransferControlsTest.kt @@ -16,8 +16,11 @@ import org.junit.Assert.assertNull import org.junit.Assert.assertTrue import org.junit.Before import org.junit.Test +import org.signal.core.models.database.AttachmentId import org.signal.core.util.bytes import org.thoughtcrime.securesms.attachments.Attachment +import org.thoughtcrime.securesms.attachments.Cdn +import org.thoughtcrime.securesms.attachments.DatabaseAttachment import org.thoughtcrime.securesms.database.AttachmentTable import org.thoughtcrime.securesms.mms.Slide import org.thoughtcrime.securesms.util.MediaUtil @@ -26,6 +29,9 @@ import org.whispersystems.signalservice.internal.crypto.PaddingInputStream class TransferControlsTest { + /** Distinct default attachmentId per slide so DatabaseAttachment equality (by id) doesn't collapse map keys. */ + private var nextSlideId: Long = 1000L + @Before fun setUp() { mockkStatic(MediaUtil::class) @@ -56,8 +62,9 @@ class TransferControlsTest { @Test fun `awaiting primary single item is centered indeterminate non-cancelable`() { - val state = stateOf(listOf(slide(AttachmentTable.TRANSFER_NEEDS_RESTORE)), awaitingPrimaryResponse = true) - val render = TransferControls.deriveRenderState(state) as TransferControlsRenderState.InProgress + val id = AttachmentId(1L) + val state = stateOf(listOf(slide(AttachmentTable.TRANSFER_NEEDS_RESTORE, attachmentId = id))) + val render = TransferControls.deriveRenderState(state, setOf(id)) as TransferControlsRenderState.InProgress assertNull(render.progress) assertEquals(TransferControls.Placement.CENTER, render.placement) assertFalse(render.cancelable) @@ -66,19 +73,29 @@ class TransferControlsTest { @Test fun `awaiting primary gallery is corner indeterminate`() { + val first = AttachmentId(1L) + val second = AttachmentId(2L) val state = stateOf( - listOf(slide(AttachmentTable.TRANSFER_NEEDS_RESTORE), slide(AttachmentTable.TRANSFER_NEEDS_RESTORE)), - awaitingPrimaryResponse = true + listOf(slide(AttachmentTable.TRANSFER_NEEDS_RESTORE, attachmentId = first), slide(AttachmentTable.TRANSFER_NEEDS_RESTORE, attachmentId = second)) ) - val render = TransferControls.deriveRenderState(state) as TransferControlsRenderState.InProgress + val render = TransferControls.deriveRenderState(state, setOf(first, second)) as TransferControlsRenderState.InProgress assertNull(render.progress) assertEquals(TransferControls.Placement.CORNER, render.placement) } @Test fun `awaiting primary still Gone when not visible`() { - val state = stateOf(listOf(slide(AttachmentTable.TRANSFER_NEEDS_RESTORE)), awaitingPrimaryResponse = true, isVisible = false) - assertEquals(TransferControlsRenderState.Gone, TransferControls.deriveRenderState(state)) + val id = AttachmentId(1L) + val state = stateOf(listOf(slide(AttachmentTable.TRANSFER_NEEDS_RESTORE, attachmentId = id)), isVisible = false) + assertEquals(TransferControlsRenderState.Gone, TransferControls.deriveRenderState(state, setOf(id))) + } + + @Test + fun `awaiting primary yields to download progress once the re-download starts`() { + val id = AttachmentId(1L) + val state = stateOf(listOf(slide(AttachmentTable.TRANSFER_PROGRESS_STARTED, attachmentId = id))) + val render = TransferControls.deriveRenderState(state, setOf(id)) as TransferControlsRenderState.InProgress + assertTrue(render.cancelable) } @Test @@ -286,9 +303,12 @@ class TransferControlsTest { private fun slide( transferState: Int, hasVideo: Boolean = false, - size: Long = 1024 + size: Long = 1024, + attachmentId: AttachmentId = AttachmentId(nextSlideId++) ): Slide { - val attachment = mockk(relaxed = true) + // asAttachment() returns a real DatabaseAttachment: deriveRenderState reads its attachmentId (a @JvmField, + // so unmockkable) to decide whether the slide is awaiting backfill. + val attachment = databaseAttachment(attachmentId, transferState) val slide = mockk(relaxed = true) every { slide.transferState } returns transferState every { slide.hasVideo() } returns hasVideo @@ -297,13 +317,52 @@ class TransferControlsTest { return slide } + private fun databaseAttachment(attachmentId: AttachmentId, transferProgress: Int): DatabaseAttachment { + return DatabaseAttachment( + attachmentId = attachmentId, + mmsId = 1L, + hasData = false, + hasThumbnail = false, + contentType = "image/jpeg", + transferProgress = transferProgress, + size = 1024L, + fileName = "photo.jpg", + cdn = Cdn.CDN_3, + location = null, + key = null, + digest = null, + incrementalDigest = null, + incrementalMacChunkSize = 0, + fastPreflightId = null, + voiceNote = false, + borderless = false, + videoGif = false, + width = 0, + height = 0, + quote = false, + caption = null, + stickerLocator = null, + blurHash = null, + audioHash = null, + transformProperties = null, + displayOrder = 0, + uploadTimestamp = 0, + dataHash = null, + archiveCdn = null, + thumbnailRestoreState = AttachmentTable.ThumbnailRestoreState.NONE, + archiveTransferState = AttachmentTable.ArchiveTransferState.NONE, + uuid = null, + quoteTargetContentType = null, + metadata = null + ) + } + private fun stateOf( slides: List, isUpload: Boolean = false, playableWhileDownloading: Boolean = false, isVisible: Boolean = true, showSecondaryText: Boolean = true, - awaitingPrimaryResponse: Boolean = false, networkProgress: Map = slides.associate { it.asAttachment() to TransferControlView.Progress(0L.bytes, 1024L.bytes) }, compressionProgress: Map = emptyMap() ): TransferControlViewState { @@ -313,7 +372,6 @@ class TransferControlsTest { playableWhileDownloading = playableWhileDownloading, isVisible = isVisible, showSecondaryText = showSecondaryText, - awaitingPrimaryResponse = awaitingPrimaryResponse, networkProgress = networkProgress, compressionProgress = compressionProgress ) diff --git a/app/src/test/java/org/thoughtcrime/securesms/jobs/AttachmentBackfillTest.kt b/app/src/test/java/org/thoughtcrime/securesms/jobs/AttachmentBackfillTest.kt new file mode 100644 index 0000000000..cb3e08776f --- /dev/null +++ b/app/src/test/java/org/thoughtcrime/securesms/jobs/AttachmentBackfillTest.kt @@ -0,0 +1,507 @@ +/* + * Copyright 2026 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.thoughtcrime.securesms.jobs + +import android.app.Application +import io.mockk.every +import io.mockk.mockk +import io.mockk.mockkObject +import io.mockk.unmockkObject +import io.mockk.verify +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.StandardTestDispatcher +import kotlinx.coroutines.test.UnconfinedTestDispatcher +import kotlinx.coroutines.test.runTest +import org.junit.After +import org.junit.Assert.assertEquals +import org.junit.Assert.assertFalse +import org.junit.Assert.assertTrue +import org.junit.Before +import org.junit.Rule +import org.junit.Test +import org.junit.runner.RunWith +import org.robolectric.RobolectricTestRunner +import org.robolectric.annotation.Config +import org.signal.core.models.database.AttachmentId +import org.signal.core.util.logging.Log +import org.thoughtcrime.securesms.attachments.Cdn +import org.thoughtcrime.securesms.attachments.DatabaseAttachment +import org.thoughtcrime.securesms.contactshare.Contact +import org.thoughtcrime.securesms.database.AttachmentTable +import org.thoughtcrime.securesms.database.MessageTable +import org.thoughtcrime.securesms.database.SignalDatabase +import org.thoughtcrime.securesms.database.model.MessageRecord +import org.thoughtcrime.securesms.database.model.MmsMessageRecord +import org.thoughtcrime.securesms.database.model.StoryType +import org.thoughtcrime.securesms.dependencies.AppDependencies +import org.thoughtcrime.securesms.jobmanager.JobManager +import org.thoughtcrime.securesms.linkpreview.LinkPreview +import org.thoughtcrime.securesms.testutil.MockAppDependenciesRule +import org.thoughtcrime.securesms.testutil.MockSignalStoreRule +import org.thoughtcrime.securesms.testutil.SystemOutLogger + +@RunWith(RobolectricTestRunner::class) +@Config(manifest = Config.NONE, application = Application::class) +class AttachmentBackfillTest { + + companion object { + private var nextId: Long = 1000L + } + + @get:Rule + val mockSignalStore = MockSignalStoreRule() + + @get:Rule + val appDependencies = MockAppDependenciesRule() + + private lateinit var jobManager: JobManager + private lateinit var messages: MessageTable + private lateinit var record: MessageRecord + + @Before + fun setUp() { + Log.initialize(SystemOutLogger()) + + every { mockSignalStore.account.isLinkedDevice } returns true + every { mockSignalStore.account.isPrimaryDevice } returns false + + jobManager = AppDependencies.jobManager + + messages = mockk(relaxed = true) + mockkObject(SignalDatabase.Companion) + every { SignalDatabase.messages } returns messages + + record = mockk(relaxed = true) + every { record.isOutgoing } returns false + every { messages.getMessageRecordOrNull(any()) } returns record + } + + @After + fun tearDown() { + // Restore the production scope so a test-scheduler scope can't leak into the next test. + AttachmentBackfill.timeoutScope = CoroutineScope(Dispatchers.IO + SupervisorJob()) + unmockkObject(SignalDatabase.Companion) + } + + /** + * Builds a real [DatabaseAttachment]. [Attachment.transferState]/[Attachment.quote] and [attachmentId] are read + * directly by the backfill logic and can't be stubbed on a mock (they're @JvmField / final), so we construct a + * concrete instance. + */ + private fun databaseAttachment( + attachmentId: AttachmentId = AttachmentId(1L), + transferProgress: Int = AttachmentTable.TRANSFER_PROGRESS_FAILED, + quote: Boolean = false, + contentType: String = "image/jpeg", + displayOrder: Int = 0 + ): DatabaseAttachment { + return DatabaseAttachment( + attachmentId = attachmentId, + mmsId = 42L, + hasData = false, + hasThumbnail = false, + contentType = contentType, + transferProgress = transferProgress, + size = 1_000L, + fileName = "photo.jpg", + cdn = Cdn.CDN_3, + location = null, + key = null, + digest = null, + incrementalDigest = null, + incrementalMacChunkSize = 0, + fastPreflightId = null, + voiceNote = false, + borderless = false, + videoGif = false, + width = 0, + height = 0, + quote = quote, + caption = null, + stickerLocator = null, + blurHash = null, + audioHash = null, + transformProperties = null, + displayOrder = displayOrder, + uploadTimestamp = 0, + dataHash = null, + archiveCdn = null, + thumbnailRestoreState = AttachmentTable.ThumbnailRestoreState.NONE, + archiveTransferState = AttachmentTable.ArchiveTransferState.NONE, + uuid = null, + quoteTargetContentType = null, + metadata = null + ) + } + + @Test + fun `enqueues when all gates pass`() { + val messageId = nextMessageId() + + AttachmentBackfill.maybeRequest(messageId, attachmentFor(messageId)) + + verify(exactly = 1) { jobManager.add(any()) } + } + + @Test + fun `skips when primary device`() { + every { mockSignalStore.account.isPrimaryDevice } returns true + val messageId = nextMessageId() + + AttachmentBackfill.maybeRequest(messageId, attachmentFor(messageId)) + + verify(exactly = 0) { jobManager.add(any()) } + } + + @Test + fun `skips when message record is missing`() { + every { messages.getMessageRecordOrNull(any()) } returns null + val messageId = nextMessageId() + + AttachmentBackfill.maybeRequest(messageId, attachmentFor(messageId)) + + verify(exactly = 0) { jobManager.add(any()) } + } + + @Test + fun `dedups consecutive requests for the same message`() { + val messageId = nextMessageId() + + AttachmentBackfill.maybeRequest(messageId, attachmentFor(messageId)) + AttachmentBackfill.maybeRequest(messageId, attachmentFor(messageId)) + + verify(exactly = 1) { jobManager.add(any()) } + } + + @Test + fun `clearPending allows requeue for the same message`() { + val messageId = nextMessageId() + + AttachmentBackfill.maybeRequest(messageId, attachmentFor(messageId)) + AttachmentBackfill.clearPending(messageId) + AttachmentBackfill.maybeRequest(messageId, attachmentFor(messageId)) + + verify(exactly = 2) { jobManager.add(any()) } + } + + @Test + fun `skips quote attachments as ineligible kind`() { + val messageId = nextMessageId() + val attachmentId = AttachmentId(messageId) + + AttachmentBackfill.maybeRequest(messageId, databaseAttachment(attachmentId = attachmentId, quote = true)) + + verify(exactly = 0) { jobManager.add(any()) } + assertFalse(AttachmentBackfill.awaiting.value.contains(attachmentId)) + } + + @Test + fun `skips attachments on story messages`() { + val messageId = nextMessageId() + val attachmentId = AttachmentId(messageId) + val storyRecord = mockk(relaxed = true) + every { storyRecord.storyType } returns StoryType.STORY_WITH_REPLIES + every { messages.getMessageRecordOrNull(any()) } returns storyRecord + + AttachmentBackfill.maybeRequest(messageId, databaseAttachment(attachmentId = attachmentId)) + + verify(exactly = 0) { jobManager.add(any()) } + assertFalse(AttachmentBackfill.awaiting.value.contains(attachmentId)) + } + + @Test + fun `skips link-preview attachments as ineligible kind`() { + val messageId = nextMessageId() + val previewId = AttachmentId(messageId) + val preview = mockk() + every { preview.attachmentId } returns previewId + val mmsRecord = mockk(relaxed = true) + every { mmsRecord.storyType } returns StoryType.NONE + every { mmsRecord.linkPreviews } returns listOf(preview) + every { mmsRecord.sharedContacts } returns emptyList() + every { messages.getMessageRecordOrNull(any()) } returns mmsRecord + + AttachmentBackfill.maybeRequest(messageId, databaseAttachment(attachmentId = previewId)) + + verify(exactly = 0) { jobManager.add(any()) } + assertFalse(AttachmentBackfill.awaiting.value.contains(previewId)) + } + + @Test + fun `skips contact-share avatar attachments as ineligible kind`() { + val messageId = nextMessageId() + val avatarId = AttachmentId(messageId) + val avatar = mockk() + every { avatar.attachmentId } returns avatarId + val contact = mockk() + every { contact.avatar } returns avatar + val mmsRecord = mockk(relaxed = true) + every { mmsRecord.storyType } returns StoryType.NONE + every { mmsRecord.linkPreviews } returns emptyList() + every { mmsRecord.sharedContacts } returns listOf(contact) + every { messages.getMessageRecordOrNull(any()) } returns mmsRecord + + AttachmentBackfill.maybeRequest(messageId, databaseAttachment(attachmentId = avatarId)) + + verify(exactly = 0) { jobManager.add(any()) } + assertFalse(AttachmentBackfill.awaiting.value.contains(avatarId)) + } + + @Test + fun `allows long-text overflow attachments`() { + val messageId = nextMessageId() + val attachmentId = AttachmentId(messageId) + + AttachmentBackfill.maybeRequest(messageId, databaseAttachment(attachmentId = attachmentId, contentType = "text/x-signal-plain")) + + verify(exactly = 1) { jobManager.add(any()) } + assertTrue(AttachmentBackfill.awaiting.value.contains(attachmentId)) + } + + @Test + fun `backfillContractForMessage builds the positional body and longText contract both ends share`() { + val messageId = nextMessageId() + + val body1Id = AttachmentId(101) + val body2Id = AttachmentId(102) + val longTextId = AttachmentId(103) + val quoteId = AttachmentId(104) + val previewId = AttachmentId(105) + val contactId = AttachmentId(106) + + val body1 = databaseAttachment(attachmentId = body1Id, displayOrder = 1) + val body2 = databaseAttachment(attachmentId = body2Id, displayOrder = 2) + val longText = databaseAttachment(attachmentId = longTextId, displayOrder = 3, contentType = "text/x-signal-plain") + val quote = databaseAttachment(attachmentId = quoteId, displayOrder = 4, quote = true) + val preview = databaseAttachment(attachmentId = previewId, displayOrder = 5) + val contact = databaseAttachment(attachmentId = contactId, displayOrder = 6) + + val linkPreview = mockk() + every { linkPreview.attachmentId } returns previewId + val avatar = mockk() + every { avatar.attachmentId } returns contactId + val sharedContact = mockk() + every { sharedContact.avatar } returns avatar + + val mmsRecord = mockk(relaxed = true) + every { mmsRecord.storyType } returns StoryType.NONE + every { mmsRecord.linkPreviews } returns listOf(linkPreview) + every { mmsRecord.sharedContacts } returns listOf(sharedContact) + every { messages.getMessageRecordOrNull(any()) } returns mmsRecord + + val attachmentsTable = mockk() + every { SignalDatabase.attachments } returns attachmentsTable + // Deliberately out of display order to prove the contract sorts. + every { attachmentsTable.getAttachmentsForMessage(messageId) } returns listOf(body2, longText, quote, body1, preview, contact) + + val contract = AttachmentBackfill.backfillContractForMessage(messageId) + + // contract.body IS the positional wire array both ends share: index 0 -> body1, 1 -> body2. + assertEquals(listOf(body1Id, body2Id), contract.bodyAttachments.map { it.attachmentId }) + assertEquals(longTextId, contract.longTextAttachment?.attachmentId) + } + + @Test + fun `flags the attachment as awaiting on request`() { + val messageId = nextMessageId() + val attachmentId = AttachmentId(messageId) + + AttachmentBackfill.maybeRequest(messageId, databaseAttachment(attachmentId = attachmentId)) + + assertTrue(AttachmentBackfill.awaiting.value.contains(attachmentId)) + } + + @Test + fun `final response keeps the attachment awaiting until its re-download terminates`() { + val messageId = nextMessageId() + val attachmentId = AttachmentId(messageId) + + AttachmentBackfill.maybeRequest(messageId, databaseAttachment(attachmentId = attachmentId)) + AttachmentBackfill.onResponseProcessed(messageId, anyPending = false) + + // The flag clears on the re-download terminal, not on the response — that's what avoids the retry flash. + assertTrue(AttachmentBackfill.awaiting.value.contains(attachmentId)) + } + + @Test + fun `pending response keeps the attachment awaiting`() { + val messageId = nextMessageId() + val attachmentId = AttachmentId(messageId) + + AttachmentBackfill.maybeRequest(messageId, databaseAttachment(attachmentId = attachmentId)) + AttachmentBackfill.onResponseProcessed(messageId, anyPending = true) + + assertTrue(AttachmentBackfill.awaiting.value.contains(attachmentId)) + } + + @Test + fun `onAttachmentTerminal clears that attachment`() { + val messageId = nextMessageId() + val attachmentId = AttachmentId(messageId) + + AttachmentBackfill.maybeRequest(messageId, databaseAttachment(attachmentId = attachmentId)) + AttachmentBackfill.onAttachmentTerminal(attachmentId, messageId) + + assertFalse(AttachmentBackfill.awaiting.value.contains(attachmentId)) + } + + @Test + fun `terminal of one attachment leaves the others of the message awaiting`() { + val messageId = nextMessageId() + val first = AttachmentId(messageId) + val second = AttachmentId(-messageId) + + AttachmentBackfill.maybeRequest(messageId, databaseAttachment(attachmentId = first)) + AttachmentBackfill.maybeRequest(messageId, databaseAttachment(attachmentId = second)) + + AttachmentBackfill.onAttachmentTerminal(first, messageId) + + assertFalse(AttachmentBackfill.awaiting.value.contains(first)) + assertTrue(AttachmentBackfill.awaiting.value.contains(second)) + } + + @Test + fun `last attachment terminal clears bookkeeping so the message can be re-requested`() { + val messageId = nextMessageId() + val attachmentId = AttachmentId(messageId) + + AttachmentBackfill.maybeRequest(messageId, databaseAttachment(attachmentId = attachmentId)) + AttachmentBackfill.onAttachmentTerminal(attachmentId, messageId) + AttachmentBackfill.maybeRequest(messageId, databaseAttachment(attachmentId = attachmentId)) + + verify(exactly = 2) { jobManager.add(any()) } + } + + @Test + fun `initial response timeout fires a TIMEOUT failure and clears awaiting`() = runTest { + AttachmentBackfill.timeoutScope = CoroutineScope(StandardTestDispatcher(testScheduler)) + val messageId = nextMessageId() + val attachmentId = AttachmentId(messageId) + every { record.threadId } returns 77L + + val received = mutableListOf() + val collector = backgroundScope.launch(UnconfinedTestDispatcher(testScheduler)) { + AttachmentBackfill.failures.collect { received += it } + } + + AttachmentBackfill.maybeRequest(messageId, databaseAttachment(attachmentId = attachmentId)) + assertTrue(AttachmentBackfill.awaiting.value.contains(attachmentId)) + + testScheduler.advanceUntilIdle() + + assertEquals(1, received.size) + assertEquals(messageId, received.first().messageId) + assertEquals(77L, received.first().threadId) + assertEquals(AttachmentBackfill.FailureReason.TIMEOUT, received.first().reason) + assertFalse(AttachmentBackfill.awaiting.value.contains(attachmentId)) + + collector.cancel() + } + + @Test + fun `pending response extends the timeout beyond the initial window`() = runTest { + AttachmentBackfill.timeoutScope = CoroutineScope(StandardTestDispatcher(testScheduler)) + val messageId = nextMessageId() + val attachmentId = AttachmentId(messageId) + every { record.threadId } returns 5L + + val received = mutableListOf() + val collector = backgroundScope.launch(UnconfinedTestDispatcher(testScheduler)) { + AttachmentBackfill.failures.collect { received += it } + } + + AttachmentBackfill.maybeRequest(messageId, databaseAttachment(attachmentId = attachmentId)) + AttachmentBackfill.onResponseProcessed(messageId, anyPending = true) + + // Past the 30s initial window but within the 2m pending window: the extended timer hasn't fired. + testScheduler.advanceTimeBy(60_000) + testScheduler.runCurrent() + assertTrue(received.isEmpty()) + assertTrue(AttachmentBackfill.awaiting.value.contains(attachmentId)) + + // Past the 2m pending window: now it fires. + testScheduler.advanceUntilIdle() + assertEquals(1, received.size) + assertEquals(AttachmentBackfill.FailureReason.TIMEOUT, received.first().reason) + assertFalse(AttachmentBackfill.awaiting.value.contains(attachmentId)) + + collector.cancel() + } + + @Test + fun `final response cancels the timeout so no failure is emitted`() = runTest { + AttachmentBackfill.timeoutScope = CoroutineScope(StandardTestDispatcher(testScheduler)) + val messageId = nextMessageId() + val attachmentId = AttachmentId(messageId) + + val received = mutableListOf() + val collector = backgroundScope.launch(UnconfinedTestDispatcher(testScheduler)) { + AttachmentBackfill.failures.collect { received += it } + } + + AttachmentBackfill.maybeRequest(messageId, databaseAttachment(attachmentId = attachmentId)) + AttachmentBackfill.onResponseProcessed(messageId, anyPending = false) + + testScheduler.advanceUntilIdle() + + assertTrue(received.isEmpty()) + // Awaiting persists until the re-download terminates; the final response alone doesn't clear it. + assertTrue(AttachmentBackfill.awaiting.value.contains(attachmentId)) + + collector.cancel() + } + + @Test + fun `pending response for an untracked message starts no timeout`() = runTest { + AttachmentBackfill.timeoutScope = CoroutineScope(StandardTestDispatcher(testScheduler)) + val messageId = nextMessageId() + + val received = mutableListOf() + val collector = backgroundScope.launch(UnconfinedTestDispatcher(testScheduler)) { + AttachmentBackfill.failures.collect { received += it } + } + + // No maybeRequest first, so the message isn't tracked: a stray pending response must not arm a timer. + AttachmentBackfill.onResponseProcessed(messageId, anyPending = true) + testScheduler.advanceUntilIdle() + + assertTrue(received.isEmpty()) + + collector.cancel() + } + + @Test + fun `onResponseMessageNotFound clears awaiting and emits NOT_FOUND failure`() = runTest { + val messageId = nextMessageId() + val attachmentId = AttachmentId(messageId) + val received = mutableListOf() + val collector = backgroundScope.launch(UnconfinedTestDispatcher(testScheduler)) { + AttachmentBackfill.failures.collect { received += it } + } + + AttachmentBackfill.maybeRequest(messageId, databaseAttachment(attachmentId = attachmentId)) + AttachmentBackfill.onResponseMessageNotFound(messageId, threadId = 42L) + + assertFalse(AttachmentBackfill.awaiting.value.contains(attachmentId)) + assertEquals(1, received.size) + assertEquals(messageId, received.first().messageId) + assertEquals(42L, received.first().threadId) + assertEquals(AttachmentBackfill.FailureReason.NOT_FOUND, received.first().reason) + + collector.cancel() + } + + private fun attachmentFor(messageId: Long): DatabaseAttachment = databaseAttachment(attachmentId = AttachmentId(messageId)) + + private fun nextMessageId(): Long { + val id = nextId++ + AttachmentBackfill.clearPending(id) + return id + } +} diff --git a/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java b/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java index ba9ff0dc57..5d745398d9 100644 --- a/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java +++ b/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java @@ -766,6 +766,9 @@ public class SignalServiceMessageSender { content = createDeviceNameChangeContent(message.getDeviceNameChange().get()); } else if (message.getAttachmentBackfillResponse().isPresent()) { content = createAttachmentBackfillResponseContent(message.getAttachmentBackfillResponse().get()); + } else if (message.getAttachmentBackfillRequest().isPresent()) { + content = createAttachmentBackfillRequestContent(message.getAttachmentBackfillRequest().get()); + urgent = true; } else { throw new IOException("Unsupported sync message!"); } @@ -1783,6 +1786,13 @@ public class SignalServiceMessageSender { return container.syncMessage(builder.build()).build(); } + private Content createAttachmentBackfillRequestContent(SyncMessage.AttachmentBackfillRequest proto) { + Content.Builder container = new Content.Builder(); + SyncMessage.Builder builder = createSyncMessageBuilder().attachmentBackfillRequest(proto); + + return container.syncMessage(builder.build()).build(); + } + private SyncMessage.Builder createSyncMessageBuilder() { byte[] padding = Util.getRandomLengthSecretBytes(512); diff --git a/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/messages/multidevice/SignalServiceSyncMessage.java b/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/messages/multidevice/SignalServiceSyncMessage.java index 53c4251367..450bd175ea 100644 --- a/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/messages/multidevice/SignalServiceSyncMessage.java +++ b/lib/libsignal-service/src/main/java/org/whispersystems/signalservice/api/messages/multidevice/SignalServiceSyncMessage.java @@ -6,12 +6,12 @@ package org.whispersystems.signalservice.api.messages.multidevice; -import org.whispersystems.signalservice.internal.push.SyncMessage; +import org.whispersystems.signalservice.internal.push.SyncMessage.AttachmentBackfillRequest; import org.whispersystems.signalservice.internal.push.SyncMessage.AttachmentBackfillResponse; -import org.whispersystems.signalservice.internal.push.SyncMessage.DeviceNameChange; import org.whispersystems.signalservice.internal.push.SyncMessage.CallEvent; import org.whispersystems.signalservice.internal.push.SyncMessage.CallLinkUpdate; import org.whispersystems.signalservice.internal.push.SyncMessage.CallLogEvent; +import org.whispersystems.signalservice.internal.push.SyncMessage.DeviceNameChange; import java.util.LinkedList; import java.util.List; @@ -40,6 +40,7 @@ public class SignalServiceSyncMessage { private final Optional callLogEvent; private final Optional deviceNameChange; private final Optional attachmentBackfillResponse; + private final Optional attachmentBackfillRequest; private SignalServiceSyncMessage(Optional sent, Optional contacts, @@ -59,7 +60,8 @@ public class SignalServiceSyncMessage { Optional callLinkUpdate, Optional callLogEvent, Optional deviceNameChange, - Optional attachmentBackfillResponse) + Optional attachmentBackfillResponse, + Optional attachmentBackfillRequest) { this.sent = sent; this.contacts = contacts; @@ -80,6 +82,7 @@ public class SignalServiceSyncMessage { this.callLogEvent = callLogEvent; this.deviceNameChange = deviceNameChange; this.attachmentBackfillResponse = attachmentBackfillResponse; + this.attachmentBackfillRequest = attachmentBackfillRequest; } public static SignalServiceSyncMessage forSentTranscript(SentTranscriptMessage sent) { @@ -101,6 +104,7 @@ public class SignalServiceSyncMessage { Optional.empty(), Optional.empty(), Optional.empty(), + Optional.empty(), Optional.empty()); } @@ -123,6 +127,7 @@ public class SignalServiceSyncMessage { Optional.empty(), Optional.empty(), Optional.empty(), + Optional.empty(), Optional.empty()); } @@ -145,6 +150,7 @@ public class SignalServiceSyncMessage { Optional.empty(), Optional.empty(), Optional.empty(), + Optional.empty(), Optional.empty()); } @@ -167,6 +173,7 @@ public class SignalServiceSyncMessage { Optional.empty(), Optional.empty(), Optional.empty(), + Optional.empty(), Optional.empty()); } @@ -189,6 +196,7 @@ public class SignalServiceSyncMessage { Optional.empty(), Optional.empty(), Optional.empty(), + Optional.empty(), Optional.empty()); } @@ -211,6 +219,7 @@ public class SignalServiceSyncMessage { Optional.empty(), Optional.empty(), Optional.empty(), + Optional.empty(), Optional.empty()); } @@ -236,6 +245,7 @@ public class SignalServiceSyncMessage { Optional.empty(), Optional.empty(), Optional.empty(), + Optional.empty(), Optional.empty()); } @@ -258,6 +268,7 @@ public class SignalServiceSyncMessage { Optional.empty(), Optional.empty(), Optional.empty(), + Optional.empty(), Optional.empty()); } @@ -280,6 +291,7 @@ public class SignalServiceSyncMessage { Optional.empty(), Optional.empty(), Optional.empty(), + Optional.empty(), Optional.empty()); } @@ -302,6 +314,7 @@ public class SignalServiceSyncMessage { Optional.empty(), Optional.empty(), Optional.empty(), + Optional.empty(), Optional.empty()); } @@ -324,6 +337,7 @@ public class SignalServiceSyncMessage { Optional.empty(), Optional.empty(), Optional.empty(), + Optional.empty(), Optional.empty()); } @@ -346,6 +360,7 @@ public class SignalServiceSyncMessage { Optional.empty(), Optional.empty(), Optional.empty(), + Optional.empty(), Optional.empty()); } @@ -368,6 +383,7 @@ public class SignalServiceSyncMessage { Optional.empty(), Optional.empty(), Optional.empty(), + Optional.empty(), Optional.empty()); } @@ -390,6 +406,7 @@ public class SignalServiceSyncMessage { Optional.empty(), Optional.empty(), Optional.empty(), + Optional.empty(), Optional.empty()); } @@ -412,6 +429,7 @@ public class SignalServiceSyncMessage { Optional.empty(), Optional.empty(), Optional.empty(), + Optional.empty(), Optional.empty()); } @@ -434,6 +452,7 @@ public class SignalServiceSyncMessage { Optional.empty(), Optional.empty(), Optional.empty(), + Optional.empty(), Optional.empty()); } @@ -456,6 +475,7 @@ public class SignalServiceSyncMessage { Optional.of(callLinkUpdate), Optional.empty(), Optional.empty(), + Optional.empty(), Optional.empty()); } @@ -478,6 +498,7 @@ public class SignalServiceSyncMessage { Optional.empty(), Optional.of(callLogEvent), Optional.empty(), + Optional.empty(), Optional.empty()); } @@ -500,6 +521,7 @@ public class SignalServiceSyncMessage { Optional.empty(), Optional.empty(), Optional.of(deviceNameChange), + Optional.empty(), Optional.empty()); } @@ -522,7 +544,31 @@ public class SignalServiceSyncMessage { Optional.empty(), Optional.empty(), Optional.empty(), - Optional.of(backfillResponse)); + Optional.of(backfillResponse), + Optional.empty()); + } + + public static SignalServiceSyncMessage forAttachmentBackfillRequest(@Nonnull AttachmentBackfillRequest backfillRequest) { + return new SignalServiceSyncMessage(Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.of(backfillRequest)); } public static SignalServiceSyncMessage empty() { @@ -544,6 +590,7 @@ public class SignalServiceSyncMessage { Optional.empty(), Optional.empty(), Optional.empty(), + Optional.empty(), Optional.empty()); } @@ -623,6 +670,10 @@ public class SignalServiceSyncMessage { return attachmentBackfillResponse; } + public Optional getAttachmentBackfillRequest() { + return attachmentBackfillRequest; + } + public enum FetchType { LOCAL_PROFILE, STORAGE_MANIFEST,