Add request backfill attachment support for linked devices.

This commit is contained in:
Cody Henthorne
2026-06-18 15:12:44 -04:00
committed by Greyson Parrelli
parent 83cb48d119
commit c4846d92da
21 changed files with 1863 additions and 100 deletions
@@ -0,0 +1,338 @@
/*
* Copyright 2026 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.messages
import androidx.test.ext.junit.runners.AndroidJUnit4
import assertk.assertThat
import assertk.assertions.isEqualTo
import assertk.assertions.isNotNull
import okio.ByteString.Companion.toByteString
import org.junit.After
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import org.junit.runner.RunWith
import org.signal.core.models.database.AttachmentId
import org.signal.core.util.Base64
import org.thoughtcrime.securesms.database.AttachmentTable
import org.thoughtcrime.securesms.database.MessageTable
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.keyvalue.SignalStore
import org.thoughtcrime.securesms.recipients.Recipient
import org.thoughtcrime.securesms.recipients.RecipientId
import org.thoughtcrime.securesms.testing.MessageContentFuzzer
import org.thoughtcrime.securesms.testing.SignalActivityRule
import org.thoughtcrime.securesms.util.MediaUtil
import org.whispersystems.signalservice.api.push.SignalServiceAddress
import org.whispersystems.signalservice.internal.push.AddressableMessage
import org.whispersystems.signalservice.internal.push.AttachmentPointer
import org.whispersystems.signalservice.internal.push.Content
import org.whispersystems.signalservice.internal.push.ConversationIdentifier
import org.whispersystems.signalservice.internal.push.DataMessage
import org.whispersystems.signalservice.internal.push.SyncMessage
import java.util.UUID
@Suppress("ClassName")
@RunWith(AndroidJUnit4::class)
class SyncMessageProcessorTest_attachmentBackfill {
@get:Rule
val harness = SignalActivityRule(createGroup = true)
private lateinit var messageHelper: MessageHelper
private var originalDeviceId: Int = SignalServiceAddress.DEFAULT_DEVICE_ID
@Before
fun setUp() {
messageHelper = MessageHelper(harness)
originalDeviceId = SignalStore.account.deviceId
// Make this device a linked device so backfill response handling activates.
SignalStore.account.deviceId = 2
}
@After
fun tearDown() {
SignalStore.account.deviceId = originalDeviceId
messageHelper.tearDown()
}
@Test
fun fresh_pointer_updates_row_and_resets_transfer_state() {
val (messageId, attachmentId) = insertIncomingMediaMessage(messageHelper.alice)
SignalDatabase.attachments.setTransferProgressFailed(attachmentId, messageId)
val pointer = freshPointer(cdnNumber = 3, cdnKey = "fresh-key", size = 1234, uploadTimestamp = 9_999_000L)
deliverBackfillResponse(
sender = messageHelper.alice,
sentTimestamp = sentTimestampFor(messageId),
conversationId = messageHelper.alice,
attachmentData = listOf(SyncMessage.AttachmentBackfillResponse.AttachmentData(attachment = pointer))
)
// transferState is not asserted: the forced download job's onAdded() races it PENDING -> STARTED. The pointer fields
// are written synchronously and are stable.
val refreshed = SignalDatabase.attachments.getAttachmentsForMessage(messageId).single()
assertThat(refreshed.remoteLocation).isEqualTo("fresh-key")
assertThat(refreshed.cdn.cdnNumber).isEqualTo(3)
assertThat(refreshed.size).isEqualTo(1234L)
assertThat(refreshed.uploadTimestamp).isEqualTo(9_999_000L)
}
@Test
fun terminal_error_marks_permanent_failure() {
val (messageId, attachmentId) = insertIncomingMediaMessage(messageHelper.alice)
SignalDatabase.attachments.setTransferProgressFailed(attachmentId, messageId)
deliverBackfillResponse(
sender = messageHelper.alice,
sentTimestamp = sentTimestampFor(messageId),
conversationId = messageHelper.alice,
attachmentData = listOf(
SyncMessage.AttachmentBackfillResponse.AttachmentData(
status = SyncMessage.AttachmentBackfillResponse.AttachmentData.Status.TERMINAL_ERROR
)
)
)
val refreshed = SignalDatabase.attachments.getAttachmentsForMessage(messageId).single()
assertThat(refreshed.transferState).isEqualTo(AttachmentTable.TRANSFER_PROGRESS_PERMANENT_FAILURE)
}
@Test
fun pending_status_leaves_row_unchanged() {
val (messageId, attachmentId) = insertIncomingMediaMessage(messageHelper.alice)
SignalDatabase.attachments.setTransferProgressFailed(attachmentId, messageId)
deliverBackfillResponse(
sender = messageHelper.alice,
sentTimestamp = sentTimestampFor(messageId),
conversationId = messageHelper.alice,
attachmentData = listOf(
SyncMessage.AttachmentBackfillResponse.AttachmentData(
status = SyncMessage.AttachmentBackfillResponse.AttachmentData.Status.PENDING
)
)
)
val refreshed = SignalDatabase.attachments.getAttachmentsForMessage(messageId).single()
assertThat(refreshed.transferState).isEqualTo(AttachmentTable.TRANSFER_PROGRESS_FAILED)
}
@Test
fun message_not_found_error_marks_attachments_retryable_failed() {
val (messageId, attachmentId) = insertIncomingMediaMessage(messageHelper.alice)
SignalDatabase.attachments.setTransferProgressFailed(attachmentId, messageId)
deliverBackfillResponse(
sender = messageHelper.alice,
sentTimestamp = sentTimestampFor(messageId),
conversationId = messageHelper.alice,
error = SyncMessage.AttachmentBackfillResponse.Error.MESSAGE_NOT_FOUND
)
val refreshed = SignalDatabase.attachments.getAttachmentsForMessage(messageId).single()
assertThat(refreshed.transferState).isEqualTo(AttachmentTable.TRANSFER_PROGRESS_FAILED)
}
@Test
fun primary_device_ignores_backfill_response() {
SignalStore.account.deviceId = SignalServiceAddress.DEFAULT_DEVICE_ID
val (messageId, attachmentId) = insertIncomingMediaMessage(messageHelper.alice)
SignalDatabase.attachments.setTransferProgressFailed(attachmentId, messageId)
deliverBackfillResponse(
sender = messageHelper.alice,
sentTimestamp = sentTimestampFor(messageId),
conversationId = messageHelper.alice,
attachmentData = listOf(
SyncMessage.AttachmentBackfillResponse.AttachmentData(
status = SyncMessage.AttachmentBackfillResponse.AttachmentData.Status.TERMINAL_ERROR
)
)
)
val refreshed = SignalDatabase.attachments.getAttachmentsForMessage(messageId).single()
assertThat(refreshed.transferState).isEqualTo(AttachmentTable.TRANSFER_PROGRESS_FAILED)
}
@Test
fun multi_attachment_response_matches_positionally_with_mixed_status() {
val messageId = insertIncomingMessageWith(messageHelper.alice, listOf(incomingImagePointer(), incomingImagePointer()))
val body = SignalDatabase.attachments.getAttachmentsForMessage(messageId).sortedBy { it.displayOrder }
assertThat(body.size).isEqualTo(2)
body.forEach { SignalDatabase.attachments.setTransferProgressFailed(it.attachmentId, messageId) }
// Response is a positional array: index 0 -> body[0] (fresh pointer), index 1 -> body[1] (terminal).
deliverBackfillResponse(
sender = messageHelper.alice,
sentTimestamp = sentTimestampFor(messageId),
conversationId = messageHelper.alice,
attachmentData = listOf(
SyncMessage.AttachmentBackfillResponse.AttachmentData(attachment = freshPointer(cdnNumber = 3, cdnKey = "first-key", size = 11, uploadTimestamp = 111L)),
SyncMessage.AttachmentBackfillResponse.AttachmentData(status = SyncMessage.AttachmentBackfillResponse.AttachmentData.Status.TERMINAL_ERROR)
)
)
val refreshed = SignalDatabase.attachments.getAttachmentsForMessage(messageId).sortedBy { it.displayOrder }
// remoteLocation proves index 0 routed to body[0]. transferState is not asserted: it races the download job's onAdded().
assertThat(refreshed[0].remoteLocation).isEqualTo("first-key")
assertThat(refreshed[0].cdn.cdnNumber).isEqualTo(3)
assertThat(refreshed[1].transferState).isEqualTo(AttachmentTable.TRANSFER_PROGRESS_PERMANENT_FAILURE)
}
@Test
fun long_text_slot_is_applied_independently_of_the_body() {
val messageId = insertIncomingMessageWith(messageHelper.alice, listOf(incomingImagePointer(), incomingLongTextPointer()))
val all = SignalDatabase.attachments.getAttachmentsForMessage(messageId)
all.forEach { SignalDatabase.attachments.setTransferProgressFailed(it.attachmentId, messageId) }
deliverBackfillResponse(
sender = messageHelper.alice,
sentTimestamp = sentTimestampFor(messageId),
conversationId = messageHelper.alice,
attachmentData = listOf(SyncMessage.AttachmentBackfillResponse.AttachmentData(attachment = freshPointer(cdnNumber = 3, cdnKey = "body-key", size = 22, uploadTimestamp = 222L))),
longText = SyncMessage.AttachmentBackfillResponse.AttachmentData(attachment = freshPointer(cdnNumber = 3, cdnKey = "long-text-key", size = 33, uploadTimestamp = 333L))
)
val refreshed = SignalDatabase.attachments.getAttachmentsForMessage(messageId)
val bodyRow = refreshed.single { it.contentType != MediaUtil.LONG_TEXT }
val longTextRow = refreshed.single { it.contentType == MediaUtil.LONG_TEXT }
// The positional `attachments` array fills the body row and the separate `longText` slot fills the long-text row,
// with no cross-contamination. transferState is not asserted: it races the download job's onAdded().
assertThat(bodyRow.remoteLocation).isEqualTo("body-key")
assertThat(longTextRow.remoteLocation).isEqualTo("long-text-key")
}
@Test
fun remote_attachment_list_longer_than_local_skips_extras() {
val messageId = insertIncomingMessageWith(messageHelper.alice, listOf(incomingImagePointer()))
val attachmentId = SignalDatabase.attachments.getAttachmentsForMessage(messageId).single().attachmentId
SignalDatabase.attachments.setTransferProgressFailed(attachmentId, messageId)
deliverBackfillResponse(
sender = messageHelper.alice,
sentTimestamp = sentTimestampFor(messageId),
conversationId = messageHelper.alice,
attachmentData = listOf(
SyncMessage.AttachmentBackfillResponse.AttachmentData(attachment = freshPointer(cdnNumber = 3, cdnKey = "only-key", size = 44, uploadTimestamp = 444L)),
SyncMessage.AttachmentBackfillResponse.AttachmentData(attachment = freshPointer(cdnNumber = 3, cdnKey = "extra-key", size = 55, uploadTimestamp = 555L))
)
)
// The single local row is routed from index 0; the extra index-1 entry has no body[1] and must be skipped, not throw.
val refreshed = SignalDatabase.attachments.getAttachmentsForMessage(messageId).single()
assertThat(refreshed.remoteLocation).isEqualTo("only-key")
}
private fun insertIncomingMediaMessage(sender: RecipientId): Pair<Long, AttachmentId> {
messageHelper.startTime = messageHelper.nextStartTime()
val sentTimestamp = messageHelper.startTime
val content = Content.Builder()
.dataMessage(
DataMessage.Builder()
.timestamp(sentTimestamp)
.attachments(listOf(MessageContentFuzzer.attachmentPointer()))
.build()
)
.build()
messageHelper.processor.process(
envelope = MessageContentFuzzer.envelope(sentTimestamp),
content = content,
metadata = MessageContentFuzzer.envelopeMetadata(source = sender, destination = harness.self.id),
serverDeliveredTimestamp = sentTimestamp + 10
)
val syncMessageId = MessageTable.SyncMessageId(sender, sentTimestamp)
val messageId = SignalDatabase.messages.getMessageIdOrNull(syncMessageId)
assertThat(messageId, name = "messageId").isNotNull()
val attachment = SignalDatabase.attachments.getAttachmentsForMessage(messageId!!).single()
return messageId to attachment.attachmentId
}
private fun insertIncomingMessageWith(sender: RecipientId, pointers: List<AttachmentPointer>): Long {
messageHelper.startTime = messageHelper.nextStartTime()
val sentTimestamp = messageHelper.startTime
val content = Content.Builder()
.dataMessage(
DataMessage.Builder()
.timestamp(sentTimestamp)
.attachments(pointers)
.build()
)
.build()
messageHelper.processor.process(
envelope = MessageContentFuzzer.envelope(sentTimestamp),
content = content,
metadata = MessageContentFuzzer.envelopeMetadata(source = sender, destination = harness.self.id),
serverDeliveredTimestamp = sentTimestamp + 10
)
val messageId = SignalDatabase.messages.getMessageIdOrNull(MessageTable.SyncMessageId(sender, sentTimestamp))
assertThat(messageId, name = "messageId").isNotNull()
return messageId!!
}
private fun incomingImagePointer(): AttachmentPointer = MessageContentFuzzer.attachmentPointer().newBuilder().contentType("image/jpeg").build()
private fun incomingLongTextPointer(): AttachmentPointer = MessageContentFuzzer.attachmentPointer().newBuilder().contentType(MediaUtil.LONG_TEXT).build()
private fun sentTimestampFor(messageId: Long): Long {
return SignalDatabase.messages.getMessageRecord(messageId).dateSent
}
private fun deliverBackfillResponse(
sender: RecipientId,
sentTimestamp: Long,
conversationId: RecipientId,
attachmentData: List<SyncMessage.AttachmentBackfillResponse.AttachmentData> = emptyList(),
longText: SyncMessage.AttachmentBackfillResponse.AttachmentData? = null,
error: SyncMessage.AttachmentBackfillResponse.Error? = null
) {
messageHelper.startTime = messageHelper.nextStartTime()
val envelopeTimestamp = messageHelper.startTime
val response = SyncMessage.AttachmentBackfillResponse(
targetMessage = AddressableMessage(
authorServiceIdBinary = Recipient.resolved(sender).requireAci().toByteString(),
sentTimestamp = sentTimestamp
),
targetConversation = ConversationIdentifier(
threadServiceIdBinary = Recipient.resolved(conversationId).requireAci().toByteString()
),
attachments = if (error == null) SyncMessage.AttachmentBackfillResponse.AttachmentDataList(attachments = attachmentData, longText = longText) else null,
error = error
)
val content = Content.Builder()
.syncMessage(SyncMessage.Builder().attachmentBackfillResponse(response).build())
.build()
messageHelper.processor.process(
envelope = MessageContentFuzzer.envelope(envelopeTimestamp, serverGuid = UUID.randomUUID()),
content = content,
metadata = MessageContentFuzzer.envelopeMetadata(source = harness.self.id, destination = harness.self.id, sourceDeviceId = 1),
serverDeliveredTimestamp = envelopeTimestamp + 10
)
}
private fun freshPointer(cdnNumber: Int, cdnKey: String, size: Int, uploadTimestamp: Long): AttachmentPointer {
return AttachmentPointer.Builder()
.cdnKey(cdnKey)
.cdnNumber(cdnNumber)
.key(Base64.decode("AAAAAAAA").toByteString())
.digest(ByteArray(32) { it.toByte() }.toByteString())
.size(size)
.uploadTimestamp(uploadTimestamp)
.contentType("image/jpeg")
.build()
}
}
@@ -39,6 +39,7 @@ import org.thoughtcrime.securesms.audio.AudioWaveForms;
import org.thoughtcrime.securesms.components.voice.VoiceNotePlaybackState;
import org.thoughtcrime.securesms.database.AttachmentTable;
import org.thoughtcrime.securesms.events.PartProgressEvent;
import org.thoughtcrime.securesms.jobs.AttachmentBackfill;
import org.thoughtcrime.securesms.keyvalue.SignalStore;
import org.thoughtcrime.securesms.mms.AudioSlide;
import org.thoughtcrime.securesms.mms.SlideClickListener;
@@ -82,9 +83,11 @@ public final class AudioView extends FrameLayout {
private boolean isPlaying;
private long durationMillis;
private AudioSlide audioSlide;
private boolean showControls;
private Callbacks callbacks;
private Disposable disposable = Disposable.disposed();
private Disposable disposable = Disposable.disposed();
private Disposable awaitingDisposable = Disposable.disposed();
private final Observer<VoiceNotePlaybackState> playbackStateObserver = this::onPlaybackState;
@@ -159,6 +162,12 @@ public final class AudioView extends FrameLayout {
protected void onAttachedToWindow() {
super.onAttachedToWindow();
if (!EventBus.getDefault().isRegistered(this)) EventBus.getDefault().register(this);
awaitingDisposable = AttachmentBackfill.awaitingChanges()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(ignored -> {
if (audioSlide != null) presentTransferControls(audioSlide, showControls);
}, t -> Log.w(TAG, "Error observing backfill awaiting state.", t));
}
@Override
@@ -166,6 +175,7 @@ public final class AudioView extends FrameLayout {
super.onDetachedFromWindow();
EventBus.getDefault().unregister(this);
disposable.dispose();
awaitingDisposable.dispose();
}
public void setProgressAndPlayBackgroundTint(@ColorInt int color) {
@@ -197,27 +207,8 @@ public final class AudioView extends FrameLayout {
}
}
if (showControls && audio.isPendingDownload()) {
controlToggle.displayQuick(downloadContainer);
seekBar.setEnabled(false);
downloadButton.setOnClickListener(new DownloadClickedListener(audio));
if (circleProgress != null) {
if (circleProgress.isSpinning()) circleProgress.stopSpinning();
circleProgress.setVisibility(View.GONE);
}
} else if (showControls && audio.getTransferState() == AttachmentTable.TRANSFER_PROGRESS_STARTED) {
controlToggle.displayQuick(progressAndPlay);
seekBar.setEnabled(false);
showPlayButton();
if (circleProgress != null) {
circleProgress.setVisibility(View.VISIBLE);
circleProgress.spin();
}
} else {
seekBar.setEnabled(true);
if (circleProgress != null && circleProgress.isSpinning()) circleProgress.stopSpinning();
showPlayButton();
}
this.showControls = showControls;
presentTransferControls(audio, showControls);
if (seekBar instanceof WaveFormSeekBarView) {
WaveFormSeekBarView waveFormView = (WaveFormSeekBarView) seekBar;
@@ -263,6 +254,38 @@ public final class AudioView extends FrameLayout {
}
}
private void presentTransferControls(@NonNull AudioSlide audio, boolean showControls) {
DatabaseAttachment dbAttachment = audio.asAttachment() instanceof DatabaseAttachment ? (DatabaseAttachment) audio.asAttachment() : null;
if (dbAttachment != null && dbAttachment.transferState == AttachmentTable.TRANSFER_PROGRESS_DONE && AttachmentBackfill.isAwaitingBackfill(dbAttachment.attachmentId)) {
AttachmentBackfill.onAttachmentTerminal(dbAttachment.attachmentId, dbAttachment.mmsId);
}
boolean awaitingBackfill = dbAttachment != null && AttachmentBackfill.isAwaitingBackfill(dbAttachment.attachmentId);
if (showControls && audio.isPendingDownload() && !awaitingBackfill) {
controlToggle.displayQuick(downloadContainer);
seekBar.setEnabled(false);
downloadButton.setOnClickListener(new DownloadClickedListener(audio));
if (circleProgress != null) {
if (circleProgress.isSpinning()) circleProgress.stopSpinning();
circleProgress.setVisibility(View.GONE);
}
} else if (showControls && (audio.getTransferState() == AttachmentTable.TRANSFER_PROGRESS_STARTED || awaitingBackfill)) {
controlToggle.displayQuick(progressAndPlay);
seekBar.setEnabled(false);
showPlayButton();
if (circleProgress != null) {
circleProgress.setVisibility(View.VISIBLE);
if (!circleProgress.isSpinning()) circleProgress.spin();
}
} else {
seekBar.setEnabled(true);
if (circleProgress != null && circleProgress.isSpinning()) circleProgress.stopSpinning();
showPlayButton();
}
}
public void setDownloadClickListener(@Nullable SlideClickListener listener) {
this.downloadListener = listener;
}
@@ -28,6 +28,7 @@ import org.thoughtcrime.securesms.R;
import org.thoughtcrime.securesms.attachments.DatabaseAttachment;
import org.thoughtcrime.securesms.database.AttachmentTable;
import org.thoughtcrime.securesms.events.PartProgressEvent;
import org.thoughtcrime.securesms.jobs.AttachmentBackfill;
import org.thoughtcrime.securesms.keyvalue.SignalStore;
import org.thoughtcrime.securesms.mms.Slide;
import org.thoughtcrime.securesms.mms.SlideClickListener;
@@ -36,6 +37,9 @@ import org.whispersystems.signalservice.api.util.OptionalUtil;
import java.util.Collections;
import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers;
import io.reactivex.rxjava3.disposables.Disposable;
public class DocumentView extends FrameLayout {
private static final String TAG = Log.tag(DocumentView.class);
@@ -55,6 +59,9 @@ public class DocumentView extends FrameLayout {
private @Nullable SlidesClickedListener cancelTransferClickListener;
private @Nullable SlidesClickedListener resendTransferClickListener;
private @Nullable Slide documentSlide;
private boolean showControls;
private Disposable awaitingDisposable = Disposable.disposed();
public DocumentView(@NonNull Context context) {
this(context, null);
@@ -98,12 +105,19 @@ public class DocumentView extends FrameLayout {
if (!EventBus.getDefault().isRegistered(this)) {
EventBus.getDefault().register(this);
}
awaitingDisposable = AttachmentBackfill.awaitingChanges()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(ignored -> {
if (documentSlide != null) presentTransferControls(documentSlide, showControls);
}, t -> Log.w(TAG, "Error observing backfill awaiting state.", t));
}
@Override
protected void onDetachedFromWindow() {
super.onDetachedFromWindow();
EventBus.getDefault().unregister(this);
awaitingDisposable.dispose();
}
public void setDownloadClickListener(@Nullable SlideClickListener listener) {
@@ -126,22 +140,8 @@ public class DocumentView extends FrameLayout {
final boolean showControls,
final boolean showSingleLineFilename)
{
if (showControls && documentSlide.getTransferState() == AttachmentTable.TRANSFER_PROGRESS_STARTED) {
controlToggle.displayQuick(stopUploadButton);
downloadProgress.spin();
stopUploadButton.setOnClickListener(new CancelTransferListener(documentSlide));
} else if (showControls && documentSlide.getUri() != null && documentSlide.isPendingDownload()) {
controlToggle.displayQuick(uploadButton);
uploadButton.setOnClickListener(new ResendTransferClickListener(documentSlide));
if (downloadProgress.isSpinning()) downloadProgress.stopSpinning();
} else if (showControls && documentSlide.getUri() == null && documentSlide.isPendingDownload()) {
controlToggle.displayQuick(downloadButton);
downloadButton.setOnClickListener(new DownloadClickedListener(documentSlide));
if (downloadProgress.isSpinning()) downloadProgress.stopSpinning();
} else {
controlToggle.displayQuick(iconContainer);
if (downloadProgress.isSpinning()) downloadProgress.stopSpinning();
}
this.showControls = showControls;
presentTransferControls(documentSlide, showControls);
this.documentSlide = documentSlide;
@@ -172,6 +172,33 @@ public class DocumentView extends FrameLayout {
}
}
private void presentTransferControls(@NonNull Slide documentSlide, boolean showControls) {
DatabaseAttachment dbAttachment = documentSlide.asAttachment() instanceof DatabaseAttachment ? (DatabaseAttachment) documentSlide.asAttachment() : null;
if (dbAttachment != null && dbAttachment.transferState == AttachmentTable.TRANSFER_PROGRESS_DONE && AttachmentBackfill.isAwaitingBackfill(dbAttachment.attachmentId)) {
AttachmentBackfill.onAttachmentTerminal(dbAttachment.attachmentId, dbAttachment.mmsId);
}
boolean awaitingBackfill = dbAttachment != null && AttachmentBackfill.isAwaitingBackfill(dbAttachment.attachmentId);
if (showControls && (documentSlide.getTransferState() == AttachmentTable.TRANSFER_PROGRESS_STARTED || awaitingBackfill)) {
controlToggle.displayQuick(stopUploadButton);
if (!downloadProgress.isSpinning()) downloadProgress.spin();
stopUploadButton.setOnClickListener(awaitingBackfill ? null : new CancelTransferListener(documentSlide));
} else if (showControls && documentSlide.getUri() != null && documentSlide.isPendingDownload()) {
controlToggle.displayQuick(uploadButton);
uploadButton.setOnClickListener(new ResendTransferClickListener(documentSlide));
if (downloadProgress.isSpinning()) downloadProgress.stopSpinning();
} else if (showControls && documentSlide.getUri() == null && documentSlide.isPendingDownload()) {
controlToggle.displayQuick(downloadButton);
downloadButton.setOnClickListener(new DownloadClickedListener(documentSlide));
if (downloadProgress.isSpinning()) downloadProgress.stopSpinning();
} else {
controlToggle.displayQuick(iconContainer);
if (downloadProgress.isSpinning()) downloadProgress.stopSpinning();
}
}
@Override
public void setFocusable(boolean focusable) {
super.setFocusable(focusable);
@@ -12,9 +12,14 @@ import androidx.compose.runtime.mutableStateOf
import androidx.compose.runtime.setValue
import androidx.compose.ui.platform.AbstractComposeView
import androidx.compose.ui.platform.ViewCompositionStrategy
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.cancel
import kotlinx.coroutines.launch
import org.greenrobot.eventbus.EventBus
import org.greenrobot.eventbus.Subscribe
import org.greenrobot.eventbus.ThreadMode
import org.signal.core.models.database.AttachmentId
import org.signal.core.ui.compose.theme.SignalTheme
import org.signal.core.util.ByteSize
import org.signal.core.util.ThrottledDebouncer
@@ -25,6 +30,7 @@ import org.thoughtcrime.securesms.attachments.DatabaseAttachment
import org.thoughtcrime.securesms.components.RecyclerViewParentTransitionController
import org.thoughtcrime.securesms.database.AttachmentTable
import org.thoughtcrime.securesms.events.PartProgressEvent
import org.thoughtcrime.securesms.jobs.AttachmentBackfill
import org.thoughtcrime.securesms.mms.Slide
import org.thoughtcrime.securesms.util.MediaUtil
import java.util.UUID
@@ -48,6 +54,11 @@ class TransferControlView @JvmOverloads constructor(context: Context, attrs: Att
private val progressUpdateDebouncer = ThrottledDebouncer(100)
private var previousRenderState: TransferControlsRenderState = TransferControlsRenderState.Gone
/** Active while view is attached */
private var renderScope: CoroutineScope? = null
/** Per-instance id so a single recycled view can be isolated in logcat when [VERBOSE_DEVELOPMENT_LOGGING] is on. */
private val viewId by lazy { UUID.randomUUID().toString().take(8) }
@@ -72,39 +83,56 @@ class TransferControlView @JvmOverloads constructor(context: Context, attrs: Att
override fun onAttachedToWindow() {
super.onAttachedToWindow()
if (!EventBus.getDefault().isRegistered(this)) EventBus.getDefault().register(this)
renderScope = CoroutineScope(Dispatchers.Main.immediate).also { scope ->
scope.launch {
AttachmentBackfill.awaiting.collect { renderState() }
}
}
}
override fun onDetachedFromWindow() {
super.onDetachedFromWindow()
EventBus.getDefault().unregister(this)
renderScope?.cancel()
renderScope = null
}
fun isGone(): Boolean {
return TransferControls.deriveRenderState(state) is TransferControlsRenderState.Gone
val state = TransferControls.deriveRenderState(state = state, awaitingAttachmentIds = state.slides.attachmentIdsAwaitingBackfill())
return state is TransferControlsRenderState.Gone
}
private fun updateState(stateFactory: (TransferControlViewState) -> TransferControlViewState) {
val newState = stateFactory(state)
state = stateFactory(state)
renderState()
}
val oldRender = TransferControls.deriveRenderState(state)
val newRender = TransferControls.deriveRenderState(newState)
state = newState
/**
* Derives and applies the render state by combining [state] with the current awaiting attachments. Both the state and
* awaiting paths change events funnel through here.
*/
private fun renderState() {
val oldRenderState = previousRenderState
val newRenderState = TransferControls.deriveRenderState(state, state.slides.attachmentIdsAwaitingBackfill())
previousRenderState = newRenderState
if (oldRender == newRender) {
if (oldRenderState == newRenderState) {
return
}
verboseLog { "render $oldRender -> $newRender slides=[${slidesAsLogString(newState.slides)}]" }
verboseLog { "render $oldRenderState -> $newRenderState slides=[${slidesAsLogString(state.slides)}]" }
if (oldRender is TransferControlsRenderState.InProgress && oldRender.isProgressOnlyDifference(newRender)) {
// Only throttle noisy progress changes
if (oldRenderState is TransferControlsRenderState.InProgress && oldRenderState.isProgressOnlyDifference(newRenderState)) {
progressUpdateDebouncer.publish {
renderState = newRender
renderState = newRenderState
visibility = VISIBLE
}
} else {
progressUpdateDebouncer.clear()
renderState = newRender
if (newRender !is TransferControlsRenderState.Gone) {
renderState = newRenderState
if (newRenderState !is TransferControlsRenderState.Gone) {
visibility = VISIBLE
}
}
@@ -133,6 +161,7 @@ class TransferControlView @JvmOverloads constructor(context: Context, attrs: Att
fun setSlides(slides: List<Slide>) {
require(slides.isNotEmpty()) { "Must provide at least one slide." }
clearResolvedBackfills(slides)
updateState { state ->
val isNewSlideSet = !isUpdateToExistingSet(state, slides)
val networkProgress: MutableMap<Attachment, Progress> = if (isNewSlideSet) HashMap() else state.networkProgress.toMutableMap()
@@ -201,10 +230,6 @@ class TransferControlView @JvmOverloads constructor(context: Context, attrs: Att
updateState { it.copy(isVisible = isVisible) }
}
fun setAwaitingPrimaryResponse(awaiting: Boolean) {
updateState { it.copy(awaitingPrimaryResponse = awaiting) }
}
override fun setFocusable(focusable: Boolean) {
super.setFocusable(false)
updateState { it.copy(isFocusable = focusable) }
@@ -215,6 +240,22 @@ class TransferControlView @JvmOverloads constructor(context: Context, attrs: Att
updateState { it.copy(isClickable = clickable) }
}
private fun List<Slide>.attachmentIdsAwaitingBackfill(): Set<AttachmentId> {
return this
.mapNotNullTo(HashSet()) { (it.asAttachment() as? DatabaseAttachment)?.attachmentId }
.apply { retainAll(AttachmentBackfill.awaiting.value) }
}
/** Tells [AttachmentBackfill] to stop awaiting any backfilled attachment that this view now sees as DONE. */
private fun clearResolvedBackfills(slides: List<Slide>) {
for (slide in slides) {
val attachment = slide.asAttachment() as? DatabaseAttachment ?: continue
if (attachment.transferState == AttachmentTable.TRANSFER_PROGRESS_DONE && attachment.attachmentId in AttachmentBackfill.awaiting.value) {
AttachmentBackfill.onAttachmentTerminal(attachment.attachmentId, attachment.mmsId)
}
}
}
private fun MutableMap<Attachment, Progress>.applyProgress(attachment: Attachment, update: Progress) {
if (update.completed < 0.bytes) {
remove(attachment)
@@ -21,6 +21,5 @@ data class TransferControlViewState(
val networkProgress: Map<Attachment, TransferControlView.Progress> = HashMap(),
val compressionProgress: Map<Attachment, TransferControlView.Progress> = HashMap(),
val playableWhileDownloading: Boolean = false,
val isUpload: Boolean = false,
val awaitingPrimaryResponse: Boolean = false
val isUpload: Boolean = false
)
@@ -5,9 +5,11 @@
package org.thoughtcrime.securesms.components.transfercontrols
import org.signal.core.models.database.AttachmentId
import org.signal.core.util.ByteSize
import org.signal.core.util.bytes
import org.thoughtcrime.securesms.attachments.Attachment
import org.thoughtcrime.securesms.attachments.DatabaseAttachment
import org.thoughtcrime.securesms.database.AttachmentTable
import org.thoughtcrime.securesms.mms.Slide
import org.thoughtcrime.securesms.util.MediaUtil
@@ -42,7 +44,10 @@ object TransferControls {
data class Bytes(val completed: ByteSize, val total: ByteSize) : ProgressLabel
}
fun deriveRenderState(state: TransferControlViewState): TransferControlsRenderState {
fun deriveRenderState(
state: TransferControlViewState,
awaitingAttachmentIds: Set<AttachmentId> = emptySet()
): TransferControlsRenderState {
if (state.slides.isEmpty()) {
return TransferControlsRenderState.Gone
}
@@ -55,14 +60,17 @@ object TransferControls {
return TransferControlsRenderState.Gone
}
if (state.awaitingPrimaryResponse) {
// If any attachments are being backfilled, overwrite with in progress state to maintain spinner
val awaitingBackfill = state.slides.any { (it.asAttachment() as? DatabaseAttachment)?.attachmentId in awaitingAttachmentIds }
if (awaitingBackfill) {
val downloading = state.slides.any { it.transferState == AttachmentTable.TRANSFER_PROGRESS_STARTED }
return TransferControlsRenderState.InProgress(
isUpload = false,
placement = if (state.slides.size == 1) Placement.CENTER else Placement.CORNER,
progress = null,
progress = if (downloading) calculateProgress(state) else null,
showPlayButton = false,
cancelable = false,
label = null
cancelable = downloading,
label = if (downloading) progressLabel(state) else null
)
}
@@ -173,10 +181,6 @@ object TransferControls {
return weightedProgress / weightedTotal
}
/**
* Internal, view-free mirror of the legacy state machine. Kept verbatim from the original view to preserve behavior; the
* resulting [Mode] is mapped to a [TransferControlsRenderState] by [deriveRenderState].
*/
private fun deriveMode(state: TransferControlViewState): Mode {
if (state.slides.isEmpty()) {
return Mode.GONE
@@ -279,9 +283,6 @@ object TransferControls {
private const val UPLOAD_TASK_WEIGHT = 1
/**
* A weighting compared to [UPLOAD_TASK_WEIGHT]
*/
private const val COMPRESSION_TASK_WEIGHT = 3
private enum class Mode {
@@ -281,6 +281,7 @@ import org.thoughtcrime.securesms.groups.ui.migration.GroupsV1MigrationInfoBotto
import org.thoughtcrime.securesms.groups.ui.migration.GroupsV1MigrationSuggestionsDialog
import org.thoughtcrime.securesms.groups.v2.GroupBlockJoinRequestResult
import org.thoughtcrime.securesms.invites.InviteActions
import org.thoughtcrime.securesms.jobs.AttachmentBackfill
import org.thoughtcrime.securesms.jobs.ServiceOutageDetectionJob
import org.thoughtcrime.securesms.keyboard.KeyboardPage
import org.thoughtcrime.securesms.keyboard.KeyboardPagerFragment
@@ -1408,6 +1409,16 @@ class ConversationFragment :
}
}
lifecycleScope.launch {
lifecycle.repeatOnLifecycle(Lifecycle.State.RESUMED) {
AttachmentBackfill.failures.collect { failure ->
if (failure.threadId == args.threadId) {
showAttachmentBackfillFailureDialog(failure.reason)
}
}
}
}
if (TextSecurePreferences.getServiceOutage(context)) {
AppDependencies.jobManager.add(ServiceOutageDetectionJob())
}
@@ -3104,6 +3115,19 @@ class ConversationFragment :
dialogBuilder.show()
}
private fun showAttachmentBackfillFailureDialog(reason: AttachmentBackfill.FailureReason) {
val messageRes = when (reason) {
AttachmentBackfill.FailureReason.TIMEOUT -> R.string.ConversationFragment_attachment_backfill_timeout
AttachmentBackfill.FailureReason.NOT_FOUND -> R.string.ConversationFragment_attachment_backfill_not_found
}
MaterialAlertDialogBuilder(requireContext())
.setTitle(R.string.ConversationFragment_attachment_backfill_failed_title)
.setMessage(messageRes)
.setPositiveButton(android.R.string.ok, null)
.show()
}
private fun handleDisplayDetails(conversationMessage: ConversationMessage) {
val recipientSnapshot = viewModel.recipientSnapshot ?: return
chatRouter.goToChatDetail(MainNavigationDetailLocation.Chats.MessageDetails(recipientSnapshot.id, MessageId(conversationMessage.messageRecord.id)))
@@ -1723,6 +1723,50 @@ class AttachmentTable(
}
}
/**
* Replaces the remote pointer fields on [attachmentId] with the values from a primary device's
* backfill response and resets the transfer state to pending so a fresh download can run.
*/
fun updatePointerFromBackfill(attachmentId: AttachmentId, attachment: Attachment): Boolean {
val remoteLocation = attachment.remoteLocation
if (remoteLocation.isNullOrBlank()) {
Log.w(TAG, "[updatePointerFromBackfill] Attachment has no remote location for $attachmentId.")
return false
}
val remoteKey = attachment.remoteKey
if (remoteKey.isNullOrBlank()) {
Log.w(TAG, "[updatePointerFromBackfill] Attachment missing key for $attachmentId.")
return false
}
val remoteDigest = attachment.remoteDigest
if (remoteDigest == null) {
Log.w(TAG, "[updatePointerFromBackfill] Attachment missing digest for $attachmentId.")
return false
}
val values = contentValuesOf(
TRANSFER_STATE to TRANSFER_PROGRESS_PENDING,
CDN_NUMBER to attachment.cdn.cdnNumber,
REMOTE_LOCATION to remoteLocation,
REMOTE_KEY to remoteKey,
REMOTE_DIGEST to remoteDigest,
REMOTE_INCREMENTAL_DIGEST to attachment.incrementalDigest,
REMOTE_INCREMENTAL_DIGEST_CHUNK_SIZE to attachment.incrementalMacChunkSize,
DATA_SIZE to attachment.size,
UPLOAD_TIMESTAMP to attachment.uploadTimestamp
)
val updateCount = writableDatabase
.update(TABLE_NAME)
.values(values)
.where("$ID = ?", attachmentId.id)
.run()
return updateCount > 0
}
/**
* Updates the attachment (and all attachments that share the same data file) with a new length.
*/
@@ -0,0 +1,289 @@
/*
* Copyright 2026 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.jobs
import androidx.annotation.VisibleForTesting
import io.reactivex.rxjava3.core.Observable
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.update
import kotlinx.coroutines.launch
import kotlinx.coroutines.rx3.asObservable
import org.signal.core.models.database.AttachmentId
import org.signal.core.util.logging.Log
import org.thoughtcrime.securesms.attachments.DatabaseAttachment
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.database.model.MessageRecord
import org.thoughtcrime.securesms.database.model.MmsMessageRecord
import org.thoughtcrime.securesms.dependencies.AppDependencies
import org.thoughtcrime.securesms.keyvalue.SignalStore
import org.thoughtcrime.securesms.util.MediaUtil
import java.util.concurrent.ConcurrentHashMap
import kotlin.time.Duration
import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds
/**
* Orchestrates outbound attachment backfill requests and the awaiting-state that drives the UI controls.
*
* [awaiting] is keyed by [AttachmentId] and combined with the slides in [TransferControlView]. The flag is set in
* [maybeRequest] and cleared in [onAttachmentTerminal] when the backfill download terminates in failure or is rendered
* in success.
*
* Dedup, timeout, threadId, and the failure dialog are per-message.
*/
object AttachmentBackfill {
private val TAG = Log.tag(AttachmentBackfill::class)
/** Suppress duplicate requests for the same message; bounded so a lost request can be re-driven by a later retry. */
private val DEDUP_REQUEST_BACKFILL_WINDOW = 30.seconds.inWholeMilliseconds
/** Time to wait for an initial response from the primary before showing a check network error */
private val INITIAL_RESPONSE_TIMEOUT = 30.seconds
/** Hard stop to wait on a primary acknowledged pending attachment before showing an error */
private val PENDING_RESPONSE_TIMEOUT = 2.minutes
private val messageBackfillLastRequestedAt: ConcurrentHashMap<Long, Long> = ConcurrentHashMap()
private val messageThreads: ConcurrentHashMap<Long, Long> = ConcurrentHashMap()
private val messageTimeoutJobs: ConcurrentHashMap<Long, Job> = ConcurrentHashMap()
private val messageAttachments: ConcurrentHashMap<Long, Set<AttachmentId>> = ConcurrentHashMap()
private val _awaiting = MutableStateFlow<Set<AttachmentId>>(emptySet())
/** Attachments currently awaiting a primary backfill (request sent, not yet re-downloaded) */
val awaiting: StateFlow<Set<AttachmentId>> = _awaiting.asStateFlow()
private val _failures = MutableSharedFlow<BackfillFailure>(extraBufferCapacity = 16)
/** One-shot stream of backfill failures for a foreground surface to react to (e.g. show a dialog). */
val failures: SharedFlow<BackfillFailure> = _failures.asSharedFlow()
/** The scope the response timeouts run on. Overridable so tests can drive the timeout with virtual time. */
@VisibleForTesting
internal var timeoutScope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
/**
* Considers a backfill request for [messageId] after a permanent download failure. The attachment is flagged
* [awaiting] even when the request send is deduped, so each failed part of a multi-attachment message marks itself.
*/
@JvmStatic
fun maybeRequest(messageId: Long, attachment: DatabaseAttachment) {
if (SignalStore.account.isPrimaryDevice) {
return
}
val record = SignalDatabase.messages.getMessageRecordOrNull(messageId)
if (record == null) {
Log.d(TAG, "[$messageId] Skipping backfill: message missing.")
return
}
if (!isEligibleForBackfill(record, attachment)) {
Log.d(TAG, "[$messageId] Skipping backfill request for ineligible attachment kind.")
return
}
markAwaiting(messageId, attachment.attachmentId)
val now = System.currentTimeMillis()
val previous = messageBackfillLastRequestedAt[messageId]
if (previous != null && now - previous < DEDUP_REQUEST_BACKFILL_WINDOW) {
Log.d(TAG, "[$messageId] Request already sent ${now - previous} ms ago; attachment flagged, not re-requesting.")
return
}
messageBackfillLastRequestedAt[messageId] = now
Log.i(TAG, "[$messageId] Enqueuing attachment backfill request.")
AppDependencies.jobManager.add(MultiDeviceAttachmentBackfillRequestJob(messageId))
startTimeout(messageId, record.threadId, INITIAL_RESPONSE_TIMEOUT)
}
/**
* Manages only the response timeout: extend it while the primary still reports work pending, else cancel it.
*/
@JvmStatic
fun onResponseProcessed(messageId: Long, anyPending: Boolean) {
if (anyPending) {
val threadId = messageThreads[messageId]
if (threadId == null) {
Log.d(TAG, "[$messageId] Pending response for a message we're no longer tracking; ignoring.")
return
}
Log.i(TAG, "[$messageId] Backfill response still pending; extending timeout.")
startTimeout(messageId, threadId, PENDING_RESPONSE_TIMEOUT)
} else {
Log.i(TAG, "[$messageId] Backfill response final; awaiting re-download(s).")
cancelTimeout(messageId)
}
}
/**
* Stop awaiting an attachment whose backfill reached a terminal error state or attachment has been rendered.
* Only drops the message's bookkeeping once none of its attachments are awaiting.
*/
@JvmStatic
fun onAttachmentTerminal(attachmentId: AttachmentId, messageId: Long) {
if (!_awaiting.value.contains(attachmentId)) {
return
}
Log.i(TAG, "[$messageId] Backfill resolved for $attachmentId.")
_awaiting.update { it - attachmentId }
val remaining = messageAttachments.computeIfPresent(messageId) { _, set ->
(set - attachmentId).ifEmpty { null }
}
if (remaining == null) {
clearMessage(messageId)
}
}
/** Primary could not find the message; stop awaiting all of its attachments and surface a not-found failure. */
@JvmStatic
fun onResponseMessageNotFound(messageId: Long, threadId: Long) {
Log.w(TAG, "[$messageId] Backfill response reported message not found.")
clearMessageAndAwaiting(messageId)
_failures.tryEmit(BackfillFailure(messageId, threadId, FailureReason.NOT_FOUND))
}
@JvmStatic
fun clearPending(messageId: Long) {
clearMessageAndAwaiting(messageId)
}
private fun markAwaiting(messageId: Long, attachmentId: AttachmentId) {
messageAttachments.merge(messageId, setOf(attachmentId)) { old, added -> old + added }
_awaiting.update { it + attachmentId }
}
private fun startTimeout(messageId: Long, threadId: Long, timeout: Duration) {
messageThreads[messageId] = threadId
messageTimeoutJobs.remove(messageId)?.cancel()
val timeoutJob = timeoutScope.launch {
delay(timeout)
// Conditional remove: fails if a response replaced or cleared our entry, so we never fire a stale timeout.
if (messageTimeoutJobs.remove(messageId, coroutineContext[Job])) {
Log.w(TAG, "[$messageId] Backfill request timed out after $timeout.")
clearMessageAndAwaiting(messageId)
_failures.tryEmit(BackfillFailure(messageId, threadId, FailureReason.TIMEOUT))
}
}
messageTimeoutJobs[messageId] = timeoutJob
}
/** Cancels the pending timeout for [messageId] without otherwise touching its wait state. */
private fun cancelTimeout(messageId: Long) {
messageTimeoutJobs.remove(messageId)?.cancel()
}
/** Drops all per-message bookkeeping. Does not touch [_awaiting], see [clearMessageAndAwaiting]. */
private fun clearMessage(messageId: Long) {
messageBackfillLastRequestedAt.remove(messageId)
messageThreads.remove(messageId)
messageAttachments.remove(messageId)
messageTimeoutJobs.remove(messageId)?.cancel()
}
private fun clearMessageAndAwaiting(messageId: Long) {
val ids = messageAttachments[messageId]
if (!ids.isNullOrEmpty()) {
_awaiting.update { it - ids }
}
clearMessage(messageId)
}
/**
* The eligible attachments of a message, display-ordered. The single source of truth for the protocol set, so the
* responder (which to re-upload) and requester (how to match the response) agree. See [backfillContractForMessage].
*/
@JvmStatic
fun backfillAttachmentsForMessage(messageId: Long): List<DatabaseAttachment> {
val record = SignalDatabase.messages.getMessageRecordOrNull(messageId) ?: return emptyList()
return SignalDatabase.attachments
.getAttachmentsForMessage(messageId)
.filter { isEligibleForBackfill(record, it) }
.sortedBy { it.displayOrder }
}
/**
* Splits the full on-diskl set to mirror the wire format, splitting eligible normal attachments from long text.
*/
@JvmStatic
fun backfillContractForMessage(messageId: Long): BackfillContract {
val all = backfillAttachmentsForMessage(messageId)
return BackfillContract(
bodyAttachments = all.filterNot { it.contentType == MediaUtil.LONG_TEXT },
longTextAttachment = all.firstOrNull { it.contentType == MediaUtil.LONG_TEXT }
)
}
@JvmStatic
fun isEligibleForBackfill(record: MessageRecord, attachment: DatabaseAttachment): Boolean {
if (record is MmsMessageRecord && record.storyType.isStory) {
return false
}
if (attachment.quote) {
return false
}
if (attachment.isSticker || MediaUtil.isLongTextType(attachment.contentType)) {
return true
}
if (record is MmsMessageRecord) {
val ineligibleIds = buildSet {
record.linkPreviews.mapNotNullTo(this) { it.attachmentId }
record.sharedContacts.mapNotNullTo(this) { it.avatar?.attachmentId }
}
if (attachment.attachmentId in ineligibleIds) {
return false
}
}
return true
}
@JvmStatic
fun awaitingChanges(): Observable<Set<AttachmentId>> = awaiting.asObservable()
@JvmStatic
fun isAwaitingBackfill(attachmentId: AttachmentId): Boolean = _awaiting.value.contains(attachmentId)
enum class FailureReason {
/** The primary did not respond before the timeout. */
TIMEOUT,
/** The primary responded but could not find the message to re-upload. */
NOT_FOUND
}
data class BackfillFailure(val messageId: Long, val threadId: Long, val reason: FailureReason)
/** Body attachments (the positional `attachments` array) and the optional long-text slot of the wire response. */
data class BackfillContract(val bodyAttachments: List<DatabaseAttachment>, val longTextAttachment: DatabaseAttachment?)
}
@@ -62,7 +62,8 @@ class AttachmentDownloadJob private constructor(
parameters: Parameters,
private val messageId: Long,
private val attachmentId: AttachmentId,
private val forceDownload: Boolean
private val forceDownload: Boolean,
private val requestSource: RequestSource
) : BaseJob(parameters) {
companion object {
@@ -72,6 +73,7 @@ class AttachmentDownloadJob private constructor(
private const val KEY_MESSAGE_ID = "message_id"
private const val KEY_ATTACHMENT_ID = "part_row_id"
private const val KEY_FORCE_DOWNLOAD = "part_manual"
private const val KEY_SOURCE = "requestSource"
@JvmStatic
fun constructQueueString(attachmentId: AttachmentId): String {
@@ -113,7 +115,8 @@ class AttachmentDownloadJob private constructor(
val downloadJob = AttachmentDownloadJob(
messageId = databaseAttachment.mmsId,
attachmentId = databaseAttachment.attachmentId,
forceDownload = true
forceDownload = true,
requestSource = RequestSource.USER
)
AppDependencies.jobManager.add(downloadJob)
downloadJob.id
@@ -144,9 +147,23 @@ class AttachmentDownloadJob private constructor(
}
}
constructor(messageId: Long, attachmentId: AttachmentId, forceDownload: Boolean) : this(messageId, attachmentId, forceDownload, forceDownload, forceDownload)
@JvmOverloads
constructor(
messageId: Long,
attachmentId: AttachmentId,
forceDownload: Boolean,
requestSource: RequestSource = RequestSource.AUTO
) : this(messageId, attachmentId, forceDownload, forceDownload, forceDownload, requestSource)
constructor(messageId: Long, attachmentId: AttachmentId, forceDownload: Boolean, skipInCallConstraint: Boolean, isHighPriority: Boolean) : this(
@JvmOverloads
constructor(
messageId: Long,
attachmentId: AttachmentId,
forceDownload: Boolean,
skipInCallConstraint: Boolean,
isHighPriority: Boolean,
requestSource: RequestSource = RequestSource.AUTO
) : this(
Parameters.Builder()
.setQueue(constructQueueString(attachmentId))
.addConstraint(NetworkConstraint.KEY)
@@ -157,7 +174,8 @@ class AttachmentDownloadJob private constructor(
.build(),
messageId,
attachmentId,
forceDownload
forceDownload,
requestSource
)
override fun serialize(): ByteArray? {
@@ -165,6 +183,7 @@ class AttachmentDownloadJob private constructor(
.putLong(KEY_MESSAGE_ID, messageId)
.putLong(KEY_ATTACHMENT_ID, attachmentId.id)
.putBoolean(KEY_FORCE_DOWNLOAD, forceDownload)
.putString(KEY_SOURCE, requestSource.name)
.serialize()
}
@@ -482,10 +501,48 @@ class AttachmentDownloadJob private constructor(
private fun markFailed(messageId: Long, attachmentId: AttachmentId) {
SignalDatabase.attachments.setTransferProgressFailed(attachmentId, messageId)
if (requestSource == RequestSource.BACKFILL) {
AttachmentBackfill.onAttachmentTerminal(attachmentId, messageId)
} else {
maybeRequestBackfill()
}
}
private fun markPermanentlyFailed(messageId: Long, attachmentId: AttachmentId) {
SignalDatabase.attachments.setTransferProgressPermanentFailure(attachmentId, messageId)
if (requestSource == RequestSource.BACKFILL) {
AttachmentBackfill.onAttachmentTerminal(attachmentId, messageId)
} else {
maybeRequestBackfill()
}
}
private fun maybeRequestBackfill() {
if (SignalStore.account.isPrimaryDevice || requestSource != RequestSource.USER) {
return
}
val attachment = SignalDatabase.attachments.getAttachment(attachmentId) ?: return
if (SignalStore.backup.backsUpMedia && attachment.dataHash != null && attachment.transferState == AttachmentTable.TRANSFER_PROGRESS_FAILED) {
Log.i(TAG, "[$attachmentId] Backs up media and have a plaintext hash; attempting archive restore before backfill")
RestoreAttachmentJob.forManualRestore(attachment)
return
}
AttachmentBackfill.maybeRequest(messageId, attachment)
}
enum class RequestSource {
AUTO,
USER,
BACKFILL;
companion object {
fun fromName(name: String?): RequestSource = entries.firstOrNull { it.name == name } ?: AUTO
}
}
class Factory : Job.Factory<AttachmentDownloadJob?> {
@@ -495,7 +552,8 @@ class AttachmentDownloadJob private constructor(
parameters = parameters,
messageId = data.getLong(KEY_MESSAGE_ID),
attachmentId = AttachmentId(data.getLong(KEY_ATTACHMENT_ID)),
forceDownload = data.getBoolean(KEY_FORCE_DOWNLOAD)
forceDownload = data.getBoolean(KEY_FORCE_DOWNLOAD),
requestSource = RequestSource.fromName(data.getStringOrDefault(KEY_SOURCE, null))
)
}
}
@@ -210,6 +210,7 @@ public final class JobManagerFactories {
put(MarkerJob.KEY, new MarkerJob.Factory());
put(MessageSendLogCleanupJob.KEY, new MessageSendLogCleanupJob.Factory());
put(MultiDeviceAttachmentBackfillMissingJob.KEY, new MultiDeviceAttachmentBackfillMissingJob.Factory());
put(MultiDeviceAttachmentBackfillRequestJob.KEY, new MultiDeviceAttachmentBackfillRequestJob.Factory());
put(MultiDeviceAttachmentBackfillUpdateJob.KEY, new MultiDeviceAttachmentBackfillUpdateJob.Factory());
put(MultiDeviceBlockedUpdateJob.KEY, new MultiDeviceBlockedUpdateJob.Factory());
put(MultiDeviceCallLinkSyncJob.KEY, new MultiDeviceCallLinkSyncJob.Factory());
@@ -0,0 +1,139 @@
/*
* Copyright 2026 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.jobs
import okio.ByteString.Companion.toByteString
import org.signal.core.util.logging.Log
import org.signal.core.util.orNull
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.dependencies.AppDependencies
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint
import org.thoughtcrime.securesms.jobmanager.impl.SealedSenderConstraint
import org.thoughtcrime.securesms.jobs.protos.MultiDeviceAttachmentBackfillRequestJobData
import org.thoughtcrime.securesms.keyvalue.SignalStore
import org.thoughtcrime.securesms.recipients.Recipient
import org.whispersystems.signalservice.api.crypto.UntrustedIdentityException
import org.whispersystems.signalservice.api.messages.multidevice.SignalServiceSyncMessage
import org.whispersystems.signalservice.api.push.exceptions.ServerRejectedException
import org.whispersystems.signalservice.internal.push.AddressableMessage
import org.whispersystems.signalservice.internal.push.ConversationIdentifier
import org.whispersystems.signalservice.internal.push.SyncMessage
import java.io.IOException
import kotlin.time.Duration.Companion.seconds
/**
* Sent by a linked device to request the primary to re-upload attachments when we fail
* to restore them automatically but the user has specifically requested we retry.
*/
class MultiDeviceAttachmentBackfillRequestJob private constructor(
parameters: Parameters,
private val messageId: Long
) : Job(parameters) {
companion object {
private val TAG = Log.tag(MultiDeviceAttachmentBackfillRequestJob::class)
const val KEY = "MultiDeviceAttachmentBackfillRequestJob"
}
constructor(messageId: Long) : this(
Parameters.Builder()
.setLifespan(30.seconds.inWholeMilliseconds)
.setMaxAttempts(3)
.addConstraint(NetworkConstraint.KEY)
.addConstraint(SealedSenderConstraint.KEY)
.build(),
messageId
)
override fun getFactoryKey(): String = KEY
override fun serialize(): ByteArray {
return MultiDeviceAttachmentBackfillRequestJobData(messageId = messageId).encode()
}
override fun run(): Result {
if (SignalStore.account.isPrimaryDevice) {
Log.w(TAG, "Primary device should never request attachment backfill. Dropping.")
return Result.failure()
}
val record = SignalDatabase.messages.getMessageRecordOrNull(messageId)
if (record == null) {
Log.w(TAG, "[$messageId] No message record; cannot build backfill request.")
return Result.failure()
}
val author = record.fromRecipient
val authorServiceId = author.aci.orNull()?.toByteString()
if (authorServiceId == null) {
Log.w(TAG, "[$messageId] No serviceId for author ${author.id}; cannot build backfill request.")
return Result.failure()
}
val threadRecipient = SignalDatabase.threads.getRecipientForThreadId(record.threadId)
if (threadRecipient == null) {
Log.w(TAG, "[$messageId] No thread recipient for thread ${record.threadId}; cannot build backfill request.")
return Result.failure()
}
val conversation = threadRecipient.toBackfillConversationId()
if (conversation == null) {
Log.w(TAG, "[$messageId] No conversation identifier for recipient ${threadRecipient.id}; cannot build backfill request.")
return Result.failure()
}
val request = SyncMessage.AttachmentBackfillRequest(
targetMessage = AddressableMessage(
authorServiceIdBinary = authorServiceId,
sentTimestamp = record.dateSent
),
targetConversation = conversation
)
val syncMessage = SignalServiceSyncMessage.forAttachmentBackfillRequest(request)
return try {
val result = AppDependencies.signalServiceMessageSender.sendSyncMessage(syncMessage)
if (result.isSuccess) {
Log.i(TAG, "[${record.dateSent}] Sent attachment backfill request.")
Result.success()
} else {
Log.w(TAG, "[${record.dateSent}] Non-success send result; retrying.")
Result.retry(defaultBackoff())
}
} catch (e: ServerRejectedException) {
Log.w(TAG, e)
Result.failure()
} catch (e: IOException) {
Log.w(TAG, e)
Result.retry(defaultBackoff())
} catch (e: UntrustedIdentityException) {
Log.w(TAG, e)
Result.failure()
}
}
override fun onFailure() = Unit
private fun Recipient.toBackfillConversationId(): ConversationIdentifier? {
return when {
this.isGroup -> ConversationIdentifier(threadGroupId = this.requireGroupId().decodedId.toByteString())
this.hasAci -> ConversationIdentifier(threadServiceIdBinary = this.requireAci().toByteString())
this.hasPni -> ConversationIdentifier(threadServiceIdBinary = this.requirePni().toByteString())
this.hasE164 -> ConversationIdentifier(threadE164 = this.requireE164())
else -> null
}
}
class Factory : Job.Factory<MultiDeviceAttachmentBackfillRequestJob> {
override fun create(parameters: Parameters, serializedData: ByteArray?): MultiDeviceAttachmentBackfillRequestJob {
val data = MultiDeviceAttachmentBackfillRequestJobData.ADAPTER.decode(serializedData!!)
return MultiDeviceAttachmentBackfillRequestJob(parameters, data.messageId)
}
}
}
@@ -8,13 +8,11 @@ package org.thoughtcrime.securesms.jobs
import org.signal.core.util.logging.Log
import org.thoughtcrime.securesms.attachments.DatabaseAttachment
import org.thoughtcrime.securesms.attachments.toAttachmentPointer
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.dependencies.AppDependencies
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint
import org.thoughtcrime.securesms.jobmanager.impl.SealedSenderConstraint
import org.thoughtcrime.securesms.jobs.protos.MultiDeviceAttachmentBackfillUpdateJobData
import org.thoughtcrime.securesms.util.MediaUtil
import org.whispersystems.signalservice.api.crypto.UntrustedIdentityException
import org.whispersystems.signalservice.api.messages.multidevice.SignalServiceSyncMessage
import org.whispersystems.signalservice.api.push.exceptions.ServerRejectedException
@@ -75,11 +73,9 @@ class MultiDeviceAttachmentBackfillUpdateJob(
}
override fun run(): Result {
val allAttachments = SignalDatabase.attachments.getAttachmentsForMessage(messageId)
val syncAttachments: List<DatabaseAttachment> = allAttachments
.filterNot { it.quote || it.contentType == MediaUtil.LONG_TEXT }
.sortedBy { it.displayOrder }
val longTextAttachment: DatabaseAttachment? = allAttachments.firstOrNull { it.contentType == MediaUtil.LONG_TEXT }
val contract = AttachmentBackfill.backfillContractForMessage(messageId)
val syncAttachments: List<DatabaseAttachment> = contract.bodyAttachments
val longTextAttachment: DatabaseAttachment? = contract.longTextAttachment
if (syncAttachments.isEmpty() && longTextAttachment == null) {
Log.w(TAG, "Failed to find any attachments for the message! Sending a missing response.")
@@ -277,14 +277,14 @@ class RestoreAttachmentJob private constructor(
SignalLocalMetrics.ArchiveAttachmentRestore.start(attachmentId)
val progressServiceController = BackupMediaRestoreService.start(context, context.getString(R.string.BackupStatus__restoring_media))
val progressServiceController = if (!manual) BackupMediaRestoreService.start(context, context.getString(R.string.BackupStatus__restoring_media)) else null
if (progressServiceController != null) {
progressServiceController.use {
retrieveAttachment(messageId, attachmentId, attachment)
}
} else {
Log.w(TAG, "Continuing without service.")
Log.w(TAG, "Continuing without service. manual: $manual")
retrieveAttachment(messageId, attachmentId, attachment)
}
@@ -506,12 +506,22 @@ class RestoreAttachmentJob private constructor(
ArchiveDatabaseExecutor.runBlocking {
SignalDatabase.attachments.setRestoreTransferState(attachmentId, AttachmentTable.TRANSFER_PROGRESS_FAILED)
}
maybeRequestBackfill()
}
private fun markPermanentlyFailed(attachmentId: AttachmentId) {
ArchiveDatabaseExecutor.runBlocking {
SignalDatabase.attachments.setRestoreTransferState(attachmentId, AttachmentTable.TRANSFER_PROGRESS_PERMANENT_FAILURE)
}
maybeRequestBackfill()
}
private fun maybeRequestBackfill() {
if (SignalStore.account.isPrimaryDevice || !manual) {
return
}
val attachment = SignalDatabase.attachments.getAttachment(attachmentId) ?: return
AttachmentBackfill.maybeRequest(messageId, attachment)
}
private fun maybePostFailedToDownloadFromArchiveNotification() {
@@ -64,6 +64,7 @@ import org.thoughtcrime.securesms.dependencies.KeyTransparencyApi
import org.thoughtcrime.securesms.groups.BadGroupIdException
import org.thoughtcrime.securesms.groups.GroupChangeBusyException
import org.thoughtcrime.securesms.groups.GroupId
import org.thoughtcrime.securesms.jobs.AttachmentBackfill
import org.thoughtcrime.securesms.jobs.AttachmentDownloadJob
import org.thoughtcrime.securesms.jobs.AttachmentUploadJob
import org.thoughtcrime.securesms.jobs.MultiDeviceAttachmentBackfillMissingJob
@@ -135,6 +136,7 @@ import org.whispersystems.signalservice.internal.push.EditMessage
import org.whispersystems.signalservice.internal.push.Envelope
import org.whispersystems.signalservice.internal.push.StoryMessage
import org.whispersystems.signalservice.internal.push.SyncMessage
import org.whispersystems.signalservice.internal.push.SyncMessage.AttachmentBackfillResponse
import org.whispersystems.signalservice.internal.push.SyncMessage.Blocked
import org.whispersystems.signalservice.internal.push.SyncMessage.CallLinkUpdate
import org.whispersystems.signalservice.internal.push.SyncMessage.CallLogEvent
@@ -190,7 +192,7 @@ object SyncMessageProcessor {
syncMessage.callLogEvent != null -> handleSynchronizeCallLogEvent(syncMessage.callLogEvent!!, envelope.clientTimestamp!!)
syncMessage.deleteForMe != null -> handleSynchronizeDeleteForMe(context, syncMessage.deleteForMe!!, envelope.clientTimestamp!!, earlyMessageCacheEntry)
syncMessage.attachmentBackfillRequest != null -> handleSynchronizeAttachmentBackfillRequest(syncMessage.attachmentBackfillRequest!!, envelope.clientTimestamp!!)
syncMessage.attachmentBackfillResponse != null -> warn(envelope.clientTimestamp!!, "Contains a backfill response, but we don't handle these!")
syncMessage.attachmentBackfillResponse != null -> handleSynchronizeAttachmentBackfillResponse(syncMessage.attachmentBackfillResponse!!, envelope.clientTimestamp!!)
syncMessage.usernameChange != null -> handleSynchronizeUsernameChange(envelope.clientTimestamp!!)
else -> warn(envelope.clientTimestamp!!, "Contains no known sync types...")
}
@@ -1739,7 +1741,7 @@ object SyncMessageProcessor {
return
}
val attachments: List<DatabaseAttachment> = SignalDatabase.attachments.getAttachmentsForMessage(messageId).filterNot { it.quote }.sortedBy { it.displayOrder }
val attachments: List<DatabaseAttachment> = AttachmentBackfill.backfillAttachmentsForMessage(messageId)
if (attachments.isEmpty()) {
warn(timestamp, "[AttachmentBackfillRequest] There were no attachments found for the message! Enqueuing a 'missing' response.")
MultiDeviceAttachmentBackfillMissingJob.enqueue(request.targetMessage!!, request.targetConversation!!)
@@ -1747,7 +1749,7 @@ object SyncMessageProcessor {
}
val now = System.currentTimeMillis()
val needsUpload = attachments.filter { now - it.uploadTimestamp > 3.days.inWholeMilliseconds }
val needsUpload = attachments.filter { it.hasData && now - it.uploadTimestamp > 3.days.inWholeMilliseconds }
log(timestamp, "[AttachmentBackfillRequest] ${needsUpload.size}/${attachments.size} attachments need to be re-uploaded.")
for (attachment in needsUpload) {
@@ -1761,6 +1763,99 @@ object SyncMessageProcessor {
MultiDeviceAttachmentBackfillUpdateJob.enqueue(request.targetMessage!!, request.targetConversation!!, messageId)
}
private fun handleSynchronizeAttachmentBackfillResponse(response: AttachmentBackfillResponse, timestamp: Long) {
if (SignalStore.account.isPrimaryDevice) {
log(timestamp, "[AttachmentBackfillResponse] Primary device ignores attachment backfill response.")
return
}
if (response.targetMessage == null || response.targetConversation == null) {
warn(timestamp, "[AttachmentBackfillResponse] Missing targetMessage or targetConversation; dropping.")
return
}
val syncMessageId = response.targetMessage!!.toSyncMessageId(timestamp)
if (syncMessageId == null) {
warn(timestamp, "[AttachmentBackfillResponse] Invalid targetMessage; dropping.")
return
}
val conversationRecipientId = response.targetConversation!!.toRecipientId()
if (conversationRecipientId == null) {
warn(timestamp, "[AttachmentBackfillResponse] Unable to resolve targetConversation; dropping.")
return
}
val threadId = SignalDatabase.threads.getThreadIdFor(conversationRecipientId)
if (threadId == null) {
warn(timestamp, "[AttachmentBackfillResponse] No thread for conversation; dropping.")
return
}
val messageId = SignalDatabase.messages.getMessageIdOrNull(syncMessageId, threadId)
if (messageId == null) {
warn(timestamp, "[AttachmentBackfillResponse] Unable to find local message; dropping.")
return
}
if (response.error == AttachmentBackfillResponse.Error.MESSAGE_NOT_FOUND) {
log(timestamp, "[AttachmentBackfillResponse] Primary could not find message $messageId; marking attachments failed")
val attachments = AttachmentBackfill.backfillAttachmentsForMessage(messageId)
.filterNot { it.transferState == AttachmentTable.TRANSFER_PROGRESS_DONE }
for (attachment in attachments) {
SignalDatabase.attachments.setTransferProgressFailed(attachment.attachmentId, messageId)
}
AttachmentBackfill.onResponseMessageNotFound(messageId, threadId)
return
}
val attachmentList = response.attachments
if (attachmentList == null) {
warn(timestamp, "[AttachmentBackfillResponse] Response has neither error nor attachments; dropping.")
return
}
val contract = AttachmentBackfill.backfillContractForMessage(messageId)
val localBody = contract.bodyAttachments
val localLongText = contract.longTextAttachment
var anyPending = false
var pointersApplied = 0
for ((index, remote) in attachmentList.attachments.withIndex()) {
val local = localBody.getOrNull(index)
if (local == null) {
warn(timestamp, "[AttachmentBackfillResponse] Remote attachment $index has no local row; skipping.")
continue
}
when (applyAttachmentData(local, remote, messageId)) {
// UPDATED/PENDING keep awaiting; the re-download terminates them later.
BackfillApplyResult.PENDING -> anyPending = true
BackfillApplyResult.UPDATED -> pointersApplied++
// Won't re-download, so stop awaiting now.
BackfillApplyResult.SKIPPED,
BackfillApplyResult.TERMINAL -> AttachmentBackfill.onAttachmentTerminal(local.attachmentId, messageId)
}
}
if (localLongText != null) {
if (attachmentList.longText != null) {
when (applyAttachmentData(localLongText, attachmentList.longText!!, messageId)) {
BackfillApplyResult.PENDING -> anyPending = true
BackfillApplyResult.UPDATED -> pointersApplied++
BackfillApplyResult.SKIPPED,
BackfillApplyResult.TERMINAL -> AttachmentBackfill.onAttachmentTerminal(localLongText.attachmentId, messageId)
}
} else {
AttachmentBackfill.onAttachmentTerminal(localLongText.attachmentId, messageId)
}
}
log(timestamp, "[AttachmentBackfillResponse] Processed response for messageId=$messageId pointersApplied=$pointersApplied anyPending=$anyPending")
AttachmentBackfill.onResponseProcessed(messageId, anyPending)
}
private fun handleSynchronizePniChangeNumber(envelope: Envelope, metadata: EnvelopeMetadata, pniChangeNumber: SyncMessage.PniChangeNumber) {
val timestamp = envelope.clientTimestamp!!
@@ -1854,6 +1949,46 @@ object SyncMessageProcessor {
AppDependencies.jobManager.add(PreKeysSyncJob.create(forceRotationRequested = true))
}
private fun applyAttachmentData(
local: DatabaseAttachment,
remote: AttachmentBackfillResponse.AttachmentData,
messageId: Long
): BackfillApplyResult {
if (local.transferState == AttachmentTable.TRANSFER_PROGRESS_DONE) {
return BackfillApplyResult.SKIPPED
}
if (remote.status == AttachmentBackfillResponse.AttachmentData.Status.PENDING) {
return BackfillApplyResult.PENDING
}
if (remote.status == AttachmentBackfillResponse.AttachmentData.Status.TERMINAL_ERROR) {
SignalDatabase.attachments.setTransferProgressPermanentFailure(local.attachmentId, messageId)
return BackfillApplyResult.TERMINAL
}
val attachment = remote.attachment?.toPointer() ?: return BackfillApplyResult.SKIPPED
val updated = SignalDatabase.attachments.updatePointerFromBackfill(local.attachmentId, attachment)
if (!updated) {
SignalDatabase.attachments.setTransferProgressPermanentFailure(local.attachmentId, messageId)
return BackfillApplyResult.TERMINAL
}
SignalDatabase.runPostSuccessfulTransaction {
AppDependencies.jobManager.add(
AttachmentDownloadJob(
messageId = messageId,
attachmentId = local.attachmentId,
forceDownload = true,
requestSource = AttachmentDownloadJob.RequestSource.BACKFILL
)
)
}
return BackfillApplyResult.UPDATED
}
private fun handleSynchronizedPollCreate(
envelope: Envelope,
message: DataMessage,
@@ -2090,4 +2225,6 @@ object SyncMessageProcessor {
return AttachmentTable.SyncAttachmentId(syncMessageId, uuid, digest, plaintextHash)
}
}
private enum class BackfillApplyResult { SKIPPED, PENDING, TERMINAL, UPDATED }
}
+4
View File
@@ -286,3 +286,7 @@ message AdminDeleteJobData {
message IndividualSendJobV2Data {
uint64 messageId = 1;
}
message MultiDeviceAttachmentBackfillRequestJobData {
uint64 messageId = 1;
}
+6
View File
@@ -721,6 +721,12 @@
<string name="ConversationFragment_group_names">Group names</string>
<!-- Snackbar toast message shown when a profile cannot be downloaded and to try again. -->
<string name="ConversationFragment_photo_failed">Photo failed to download. Try again.</string>
<!-- Title of the dialog shown when re-requesting an expired attachment from your primary device fails -->
<string name="ConversationFragment_attachment_backfill_failed_title">Can\'t download media</string>
<!-- Body of the dialog shown when your phone didn\'t respond to a request to re-send an expired attachment -->
<string name="ConversationFragment_attachment_backfill_timeout">Check this device and your phone\'s internet connection. Open Signal on your phone, then try downloading again.</string>
<!-- Body of the dialog shown when your phone no longer has the attachment you tried to download -->
<string name="ConversationFragment_attachment_backfill_not_found">This media is no longer available on your phone and can\'t be downloaded.</string>
<!-- Dialog for how to long to keep a messaged pinned for -->
<string name="ConversationFragment__keep_pinned">Keep pinned for…</string>
<!-- Dialog option to keep message pin for 24 hours -->
@@ -16,8 +16,11 @@ import org.junit.Assert.assertNull
import org.junit.Assert.assertTrue
import org.junit.Before
import org.junit.Test
import org.signal.core.models.database.AttachmentId
import org.signal.core.util.bytes
import org.thoughtcrime.securesms.attachments.Attachment
import org.thoughtcrime.securesms.attachments.Cdn
import org.thoughtcrime.securesms.attachments.DatabaseAttachment
import org.thoughtcrime.securesms.database.AttachmentTable
import org.thoughtcrime.securesms.mms.Slide
import org.thoughtcrime.securesms.util.MediaUtil
@@ -26,6 +29,9 @@ import org.whispersystems.signalservice.internal.crypto.PaddingInputStream
class TransferControlsTest {
/** Distinct default attachmentId per slide so DatabaseAttachment equality (by id) doesn't collapse map keys. */
private var nextSlideId: Long = 1000L
@Before
fun setUp() {
mockkStatic(MediaUtil::class)
@@ -56,8 +62,9 @@ class TransferControlsTest {
@Test
fun `awaiting primary single item is centered indeterminate non-cancelable`() {
val state = stateOf(listOf(slide(AttachmentTable.TRANSFER_NEEDS_RESTORE)), awaitingPrimaryResponse = true)
val render = TransferControls.deriveRenderState(state) as TransferControlsRenderState.InProgress
val id = AttachmentId(1L)
val state = stateOf(listOf(slide(AttachmentTable.TRANSFER_NEEDS_RESTORE, attachmentId = id)))
val render = TransferControls.deriveRenderState(state, setOf(id)) as TransferControlsRenderState.InProgress
assertNull(render.progress)
assertEquals(TransferControls.Placement.CENTER, render.placement)
assertFalse(render.cancelable)
@@ -66,19 +73,29 @@ class TransferControlsTest {
@Test
fun `awaiting primary gallery is corner indeterminate`() {
val first = AttachmentId(1L)
val second = AttachmentId(2L)
val state = stateOf(
listOf(slide(AttachmentTable.TRANSFER_NEEDS_RESTORE), slide(AttachmentTable.TRANSFER_NEEDS_RESTORE)),
awaitingPrimaryResponse = true
listOf(slide(AttachmentTable.TRANSFER_NEEDS_RESTORE, attachmentId = first), slide(AttachmentTable.TRANSFER_NEEDS_RESTORE, attachmentId = second))
)
val render = TransferControls.deriveRenderState(state) as TransferControlsRenderState.InProgress
val render = TransferControls.deriveRenderState(state, setOf(first, second)) as TransferControlsRenderState.InProgress
assertNull(render.progress)
assertEquals(TransferControls.Placement.CORNER, render.placement)
}
@Test
fun `awaiting primary still Gone when not visible`() {
val state = stateOf(listOf(slide(AttachmentTable.TRANSFER_NEEDS_RESTORE)), awaitingPrimaryResponse = true, isVisible = false)
assertEquals(TransferControlsRenderState.Gone, TransferControls.deriveRenderState(state))
val id = AttachmentId(1L)
val state = stateOf(listOf(slide(AttachmentTable.TRANSFER_NEEDS_RESTORE, attachmentId = id)), isVisible = false)
assertEquals(TransferControlsRenderState.Gone, TransferControls.deriveRenderState(state, setOf(id)))
}
@Test
fun `awaiting primary yields to download progress once the re-download starts`() {
val id = AttachmentId(1L)
val state = stateOf(listOf(slide(AttachmentTable.TRANSFER_PROGRESS_STARTED, attachmentId = id)))
val render = TransferControls.deriveRenderState(state, setOf(id)) as TransferControlsRenderState.InProgress
assertTrue(render.cancelable)
}
@Test
@@ -286,9 +303,12 @@ class TransferControlsTest {
private fun slide(
transferState: Int,
hasVideo: Boolean = false,
size: Long = 1024
size: Long = 1024,
attachmentId: AttachmentId = AttachmentId(nextSlideId++)
): Slide {
val attachment = mockk<Attachment>(relaxed = true)
// asAttachment() returns a real DatabaseAttachment: deriveRenderState reads its attachmentId (a @JvmField,
// so unmockkable) to decide whether the slide is awaiting backfill.
val attachment = databaseAttachment(attachmentId, transferState)
val slide = mockk<Slide>(relaxed = true)
every { slide.transferState } returns transferState
every { slide.hasVideo() } returns hasVideo
@@ -297,13 +317,52 @@ class TransferControlsTest {
return slide
}
private fun databaseAttachment(attachmentId: AttachmentId, transferProgress: Int): DatabaseAttachment {
return DatabaseAttachment(
attachmentId = attachmentId,
mmsId = 1L,
hasData = false,
hasThumbnail = false,
contentType = "image/jpeg",
transferProgress = transferProgress,
size = 1024L,
fileName = "photo.jpg",
cdn = Cdn.CDN_3,
location = null,
key = null,
digest = null,
incrementalDigest = null,
incrementalMacChunkSize = 0,
fastPreflightId = null,
voiceNote = false,
borderless = false,
videoGif = false,
width = 0,
height = 0,
quote = false,
caption = null,
stickerLocator = null,
blurHash = null,
audioHash = null,
transformProperties = null,
displayOrder = 0,
uploadTimestamp = 0,
dataHash = null,
archiveCdn = null,
thumbnailRestoreState = AttachmentTable.ThumbnailRestoreState.NONE,
archiveTransferState = AttachmentTable.ArchiveTransferState.NONE,
uuid = null,
quoteTargetContentType = null,
metadata = null
)
}
private fun stateOf(
slides: List<Slide>,
isUpload: Boolean = false,
playableWhileDownloading: Boolean = false,
isVisible: Boolean = true,
showSecondaryText: Boolean = true,
awaitingPrimaryResponse: Boolean = false,
networkProgress: Map<Attachment, TransferControlView.Progress> = slides.associate { it.asAttachment() to TransferControlView.Progress(0L.bytes, 1024L.bytes) },
compressionProgress: Map<Attachment, TransferControlView.Progress> = emptyMap()
): TransferControlViewState {
@@ -313,7 +372,6 @@ class TransferControlsTest {
playableWhileDownloading = playableWhileDownloading,
isVisible = isVisible,
showSecondaryText = showSecondaryText,
awaitingPrimaryResponse = awaitingPrimaryResponse,
networkProgress = networkProgress,
compressionProgress = compressionProgress
)
@@ -0,0 +1,507 @@
/*
* Copyright 2026 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.jobs
import android.app.Application
import io.mockk.every
import io.mockk.mockk
import io.mockk.mockkObject
import io.mockk.unmockkObject
import io.mockk.verify
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.StandardTestDispatcher
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.runTest
import org.junit.After
import org.junit.Assert.assertEquals
import org.junit.Assert.assertFalse
import org.junit.Assert.assertTrue
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import org.junit.runner.RunWith
import org.robolectric.RobolectricTestRunner
import org.robolectric.annotation.Config
import org.signal.core.models.database.AttachmentId
import org.signal.core.util.logging.Log
import org.thoughtcrime.securesms.attachments.Cdn
import org.thoughtcrime.securesms.attachments.DatabaseAttachment
import org.thoughtcrime.securesms.contactshare.Contact
import org.thoughtcrime.securesms.database.AttachmentTable
import org.thoughtcrime.securesms.database.MessageTable
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.database.model.MessageRecord
import org.thoughtcrime.securesms.database.model.MmsMessageRecord
import org.thoughtcrime.securesms.database.model.StoryType
import org.thoughtcrime.securesms.dependencies.AppDependencies
import org.thoughtcrime.securesms.jobmanager.JobManager
import org.thoughtcrime.securesms.linkpreview.LinkPreview
import org.thoughtcrime.securesms.testutil.MockAppDependenciesRule
import org.thoughtcrime.securesms.testutil.MockSignalStoreRule
import org.thoughtcrime.securesms.testutil.SystemOutLogger
@RunWith(RobolectricTestRunner::class)
@Config(manifest = Config.NONE, application = Application::class)
class AttachmentBackfillTest {
companion object {
private var nextId: Long = 1000L
}
@get:Rule
val mockSignalStore = MockSignalStoreRule()
@get:Rule
val appDependencies = MockAppDependenciesRule()
private lateinit var jobManager: JobManager
private lateinit var messages: MessageTable
private lateinit var record: MessageRecord
@Before
fun setUp() {
Log.initialize(SystemOutLogger())
every { mockSignalStore.account.isLinkedDevice } returns true
every { mockSignalStore.account.isPrimaryDevice } returns false
jobManager = AppDependencies.jobManager
messages = mockk(relaxed = true)
mockkObject(SignalDatabase.Companion)
every { SignalDatabase.messages } returns messages
record = mockk(relaxed = true)
every { record.isOutgoing } returns false
every { messages.getMessageRecordOrNull(any()) } returns record
}
@After
fun tearDown() {
// Restore the production scope so a test-scheduler scope can't leak into the next test.
AttachmentBackfill.timeoutScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
unmockkObject(SignalDatabase.Companion)
}
/**
* Builds a real [DatabaseAttachment]. [Attachment.transferState]/[Attachment.quote] and [attachmentId] are read
* directly by the backfill logic and can't be stubbed on a mock (they're @JvmField / final), so we construct a
* concrete instance.
*/
private fun databaseAttachment(
attachmentId: AttachmentId = AttachmentId(1L),
transferProgress: Int = AttachmentTable.TRANSFER_PROGRESS_FAILED,
quote: Boolean = false,
contentType: String = "image/jpeg",
displayOrder: Int = 0
): DatabaseAttachment {
return DatabaseAttachment(
attachmentId = attachmentId,
mmsId = 42L,
hasData = false,
hasThumbnail = false,
contentType = contentType,
transferProgress = transferProgress,
size = 1_000L,
fileName = "photo.jpg",
cdn = Cdn.CDN_3,
location = null,
key = null,
digest = null,
incrementalDigest = null,
incrementalMacChunkSize = 0,
fastPreflightId = null,
voiceNote = false,
borderless = false,
videoGif = false,
width = 0,
height = 0,
quote = quote,
caption = null,
stickerLocator = null,
blurHash = null,
audioHash = null,
transformProperties = null,
displayOrder = displayOrder,
uploadTimestamp = 0,
dataHash = null,
archiveCdn = null,
thumbnailRestoreState = AttachmentTable.ThumbnailRestoreState.NONE,
archiveTransferState = AttachmentTable.ArchiveTransferState.NONE,
uuid = null,
quoteTargetContentType = null,
metadata = null
)
}
@Test
fun `enqueues when all gates pass`() {
val messageId = nextMessageId()
AttachmentBackfill.maybeRequest(messageId, attachmentFor(messageId))
verify(exactly = 1) { jobManager.add(any<MultiDeviceAttachmentBackfillRequestJob>()) }
}
@Test
fun `skips when primary device`() {
every { mockSignalStore.account.isPrimaryDevice } returns true
val messageId = nextMessageId()
AttachmentBackfill.maybeRequest(messageId, attachmentFor(messageId))
verify(exactly = 0) { jobManager.add(any<MultiDeviceAttachmentBackfillRequestJob>()) }
}
@Test
fun `skips when message record is missing`() {
every { messages.getMessageRecordOrNull(any()) } returns null
val messageId = nextMessageId()
AttachmentBackfill.maybeRequest(messageId, attachmentFor(messageId))
verify(exactly = 0) { jobManager.add(any<MultiDeviceAttachmentBackfillRequestJob>()) }
}
@Test
fun `dedups consecutive requests for the same message`() {
val messageId = nextMessageId()
AttachmentBackfill.maybeRequest(messageId, attachmentFor(messageId))
AttachmentBackfill.maybeRequest(messageId, attachmentFor(messageId))
verify(exactly = 1) { jobManager.add(any<MultiDeviceAttachmentBackfillRequestJob>()) }
}
@Test
fun `clearPending allows requeue for the same message`() {
val messageId = nextMessageId()
AttachmentBackfill.maybeRequest(messageId, attachmentFor(messageId))
AttachmentBackfill.clearPending(messageId)
AttachmentBackfill.maybeRequest(messageId, attachmentFor(messageId))
verify(exactly = 2) { jobManager.add(any<MultiDeviceAttachmentBackfillRequestJob>()) }
}
@Test
fun `skips quote attachments as ineligible kind`() {
val messageId = nextMessageId()
val attachmentId = AttachmentId(messageId)
AttachmentBackfill.maybeRequest(messageId, databaseAttachment(attachmentId = attachmentId, quote = true))
verify(exactly = 0) { jobManager.add(any<MultiDeviceAttachmentBackfillRequestJob>()) }
assertFalse(AttachmentBackfill.awaiting.value.contains(attachmentId))
}
@Test
fun `skips attachments on story messages`() {
val messageId = nextMessageId()
val attachmentId = AttachmentId(messageId)
val storyRecord = mockk<MmsMessageRecord>(relaxed = true)
every { storyRecord.storyType } returns StoryType.STORY_WITH_REPLIES
every { messages.getMessageRecordOrNull(any()) } returns storyRecord
AttachmentBackfill.maybeRequest(messageId, databaseAttachment(attachmentId = attachmentId))
verify(exactly = 0) { jobManager.add(any<MultiDeviceAttachmentBackfillRequestJob>()) }
assertFalse(AttachmentBackfill.awaiting.value.contains(attachmentId))
}
@Test
fun `skips link-preview attachments as ineligible kind`() {
val messageId = nextMessageId()
val previewId = AttachmentId(messageId)
val preview = mockk<LinkPreview>()
every { preview.attachmentId } returns previewId
val mmsRecord = mockk<MmsMessageRecord>(relaxed = true)
every { mmsRecord.storyType } returns StoryType.NONE
every { mmsRecord.linkPreviews } returns listOf(preview)
every { mmsRecord.sharedContacts } returns emptyList()
every { messages.getMessageRecordOrNull(any()) } returns mmsRecord
AttachmentBackfill.maybeRequest(messageId, databaseAttachment(attachmentId = previewId))
verify(exactly = 0) { jobManager.add(any<MultiDeviceAttachmentBackfillRequestJob>()) }
assertFalse(AttachmentBackfill.awaiting.value.contains(previewId))
}
@Test
fun `skips contact-share avatar attachments as ineligible kind`() {
val messageId = nextMessageId()
val avatarId = AttachmentId(messageId)
val avatar = mockk<Contact.Avatar>()
every { avatar.attachmentId } returns avatarId
val contact = mockk<Contact>()
every { contact.avatar } returns avatar
val mmsRecord = mockk<MmsMessageRecord>(relaxed = true)
every { mmsRecord.storyType } returns StoryType.NONE
every { mmsRecord.linkPreviews } returns emptyList()
every { mmsRecord.sharedContacts } returns listOf(contact)
every { messages.getMessageRecordOrNull(any()) } returns mmsRecord
AttachmentBackfill.maybeRequest(messageId, databaseAttachment(attachmentId = avatarId))
verify(exactly = 0) { jobManager.add(any<MultiDeviceAttachmentBackfillRequestJob>()) }
assertFalse(AttachmentBackfill.awaiting.value.contains(avatarId))
}
@Test
fun `allows long-text overflow attachments`() {
val messageId = nextMessageId()
val attachmentId = AttachmentId(messageId)
AttachmentBackfill.maybeRequest(messageId, databaseAttachment(attachmentId = attachmentId, contentType = "text/x-signal-plain"))
verify(exactly = 1) { jobManager.add(any<MultiDeviceAttachmentBackfillRequestJob>()) }
assertTrue(AttachmentBackfill.awaiting.value.contains(attachmentId))
}
@Test
fun `backfillContractForMessage builds the positional body and longText contract both ends share`() {
val messageId = nextMessageId()
val body1Id = AttachmentId(101)
val body2Id = AttachmentId(102)
val longTextId = AttachmentId(103)
val quoteId = AttachmentId(104)
val previewId = AttachmentId(105)
val contactId = AttachmentId(106)
val body1 = databaseAttachment(attachmentId = body1Id, displayOrder = 1)
val body2 = databaseAttachment(attachmentId = body2Id, displayOrder = 2)
val longText = databaseAttachment(attachmentId = longTextId, displayOrder = 3, contentType = "text/x-signal-plain")
val quote = databaseAttachment(attachmentId = quoteId, displayOrder = 4, quote = true)
val preview = databaseAttachment(attachmentId = previewId, displayOrder = 5)
val contact = databaseAttachment(attachmentId = contactId, displayOrder = 6)
val linkPreview = mockk<LinkPreview>()
every { linkPreview.attachmentId } returns previewId
val avatar = mockk<Contact.Avatar>()
every { avatar.attachmentId } returns contactId
val sharedContact = mockk<Contact>()
every { sharedContact.avatar } returns avatar
val mmsRecord = mockk<MmsMessageRecord>(relaxed = true)
every { mmsRecord.storyType } returns StoryType.NONE
every { mmsRecord.linkPreviews } returns listOf(linkPreview)
every { mmsRecord.sharedContacts } returns listOf(sharedContact)
every { messages.getMessageRecordOrNull(any()) } returns mmsRecord
val attachmentsTable = mockk<AttachmentTable>()
every { SignalDatabase.attachments } returns attachmentsTable
// Deliberately out of display order to prove the contract sorts.
every { attachmentsTable.getAttachmentsForMessage(messageId) } returns listOf(body2, longText, quote, body1, preview, contact)
val contract = AttachmentBackfill.backfillContractForMessage(messageId)
// contract.body IS the positional wire array both ends share: index 0 -> body1, 1 -> body2.
assertEquals(listOf(body1Id, body2Id), contract.bodyAttachments.map { it.attachmentId })
assertEquals(longTextId, contract.longTextAttachment?.attachmentId)
}
@Test
fun `flags the attachment as awaiting on request`() {
val messageId = nextMessageId()
val attachmentId = AttachmentId(messageId)
AttachmentBackfill.maybeRequest(messageId, databaseAttachment(attachmentId = attachmentId))
assertTrue(AttachmentBackfill.awaiting.value.contains(attachmentId))
}
@Test
fun `final response keeps the attachment awaiting until its re-download terminates`() {
val messageId = nextMessageId()
val attachmentId = AttachmentId(messageId)
AttachmentBackfill.maybeRequest(messageId, databaseAttachment(attachmentId = attachmentId))
AttachmentBackfill.onResponseProcessed(messageId, anyPending = false)
// The flag clears on the re-download terminal, not on the response — that's what avoids the retry flash.
assertTrue(AttachmentBackfill.awaiting.value.contains(attachmentId))
}
@Test
fun `pending response keeps the attachment awaiting`() {
val messageId = nextMessageId()
val attachmentId = AttachmentId(messageId)
AttachmentBackfill.maybeRequest(messageId, databaseAttachment(attachmentId = attachmentId))
AttachmentBackfill.onResponseProcessed(messageId, anyPending = true)
assertTrue(AttachmentBackfill.awaiting.value.contains(attachmentId))
}
@Test
fun `onAttachmentTerminal clears that attachment`() {
val messageId = nextMessageId()
val attachmentId = AttachmentId(messageId)
AttachmentBackfill.maybeRequest(messageId, databaseAttachment(attachmentId = attachmentId))
AttachmentBackfill.onAttachmentTerminal(attachmentId, messageId)
assertFalse(AttachmentBackfill.awaiting.value.contains(attachmentId))
}
@Test
fun `terminal of one attachment leaves the others of the message awaiting`() {
val messageId = nextMessageId()
val first = AttachmentId(messageId)
val second = AttachmentId(-messageId)
AttachmentBackfill.maybeRequest(messageId, databaseAttachment(attachmentId = first))
AttachmentBackfill.maybeRequest(messageId, databaseAttachment(attachmentId = second))
AttachmentBackfill.onAttachmentTerminal(first, messageId)
assertFalse(AttachmentBackfill.awaiting.value.contains(first))
assertTrue(AttachmentBackfill.awaiting.value.contains(second))
}
@Test
fun `last attachment terminal clears bookkeeping so the message can be re-requested`() {
val messageId = nextMessageId()
val attachmentId = AttachmentId(messageId)
AttachmentBackfill.maybeRequest(messageId, databaseAttachment(attachmentId = attachmentId))
AttachmentBackfill.onAttachmentTerminal(attachmentId, messageId)
AttachmentBackfill.maybeRequest(messageId, databaseAttachment(attachmentId = attachmentId))
verify(exactly = 2) { jobManager.add(any<MultiDeviceAttachmentBackfillRequestJob>()) }
}
@Test
fun `initial response timeout fires a TIMEOUT failure and clears awaiting`() = runTest {
AttachmentBackfill.timeoutScope = CoroutineScope(StandardTestDispatcher(testScheduler))
val messageId = nextMessageId()
val attachmentId = AttachmentId(messageId)
every { record.threadId } returns 77L
val received = mutableListOf<AttachmentBackfill.BackfillFailure>()
val collector = backgroundScope.launch(UnconfinedTestDispatcher(testScheduler)) {
AttachmentBackfill.failures.collect { received += it }
}
AttachmentBackfill.maybeRequest(messageId, databaseAttachment(attachmentId = attachmentId))
assertTrue(AttachmentBackfill.awaiting.value.contains(attachmentId))
testScheduler.advanceUntilIdle()
assertEquals(1, received.size)
assertEquals(messageId, received.first().messageId)
assertEquals(77L, received.first().threadId)
assertEquals(AttachmentBackfill.FailureReason.TIMEOUT, received.first().reason)
assertFalse(AttachmentBackfill.awaiting.value.contains(attachmentId))
collector.cancel()
}
@Test
fun `pending response extends the timeout beyond the initial window`() = runTest {
AttachmentBackfill.timeoutScope = CoroutineScope(StandardTestDispatcher(testScheduler))
val messageId = nextMessageId()
val attachmentId = AttachmentId(messageId)
every { record.threadId } returns 5L
val received = mutableListOf<AttachmentBackfill.BackfillFailure>()
val collector = backgroundScope.launch(UnconfinedTestDispatcher(testScheduler)) {
AttachmentBackfill.failures.collect { received += it }
}
AttachmentBackfill.maybeRequest(messageId, databaseAttachment(attachmentId = attachmentId))
AttachmentBackfill.onResponseProcessed(messageId, anyPending = true)
// Past the 30s initial window but within the 2m pending window: the extended timer hasn't fired.
testScheduler.advanceTimeBy(60_000)
testScheduler.runCurrent()
assertTrue(received.isEmpty())
assertTrue(AttachmentBackfill.awaiting.value.contains(attachmentId))
// Past the 2m pending window: now it fires.
testScheduler.advanceUntilIdle()
assertEquals(1, received.size)
assertEquals(AttachmentBackfill.FailureReason.TIMEOUT, received.first().reason)
assertFalse(AttachmentBackfill.awaiting.value.contains(attachmentId))
collector.cancel()
}
@Test
fun `final response cancels the timeout so no failure is emitted`() = runTest {
AttachmentBackfill.timeoutScope = CoroutineScope(StandardTestDispatcher(testScheduler))
val messageId = nextMessageId()
val attachmentId = AttachmentId(messageId)
val received = mutableListOf<AttachmentBackfill.BackfillFailure>()
val collector = backgroundScope.launch(UnconfinedTestDispatcher(testScheduler)) {
AttachmentBackfill.failures.collect { received += it }
}
AttachmentBackfill.maybeRequest(messageId, databaseAttachment(attachmentId = attachmentId))
AttachmentBackfill.onResponseProcessed(messageId, anyPending = false)
testScheduler.advanceUntilIdle()
assertTrue(received.isEmpty())
// Awaiting persists until the re-download terminates; the final response alone doesn't clear it.
assertTrue(AttachmentBackfill.awaiting.value.contains(attachmentId))
collector.cancel()
}
@Test
fun `pending response for an untracked message starts no timeout`() = runTest {
AttachmentBackfill.timeoutScope = CoroutineScope(StandardTestDispatcher(testScheduler))
val messageId = nextMessageId()
val received = mutableListOf<AttachmentBackfill.BackfillFailure>()
val collector = backgroundScope.launch(UnconfinedTestDispatcher(testScheduler)) {
AttachmentBackfill.failures.collect { received += it }
}
// No maybeRequest first, so the message isn't tracked: a stray pending response must not arm a timer.
AttachmentBackfill.onResponseProcessed(messageId, anyPending = true)
testScheduler.advanceUntilIdle()
assertTrue(received.isEmpty())
collector.cancel()
}
@Test
fun `onResponseMessageNotFound clears awaiting and emits NOT_FOUND failure`() = runTest {
val messageId = nextMessageId()
val attachmentId = AttachmentId(messageId)
val received = mutableListOf<AttachmentBackfill.BackfillFailure>()
val collector = backgroundScope.launch(UnconfinedTestDispatcher(testScheduler)) {
AttachmentBackfill.failures.collect { received += it }
}
AttachmentBackfill.maybeRequest(messageId, databaseAttachment(attachmentId = attachmentId))
AttachmentBackfill.onResponseMessageNotFound(messageId, threadId = 42L)
assertFalse(AttachmentBackfill.awaiting.value.contains(attachmentId))
assertEquals(1, received.size)
assertEquals(messageId, received.first().messageId)
assertEquals(42L, received.first().threadId)
assertEquals(AttachmentBackfill.FailureReason.NOT_FOUND, received.first().reason)
collector.cancel()
}
private fun attachmentFor(messageId: Long): DatabaseAttachment = databaseAttachment(attachmentId = AttachmentId(messageId))
private fun nextMessageId(): Long {
val id = nextId++
AttachmentBackfill.clearPending(id)
return id
}
}
@@ -766,6 +766,9 @@ public class SignalServiceMessageSender {
content = createDeviceNameChangeContent(message.getDeviceNameChange().get());
} else if (message.getAttachmentBackfillResponse().isPresent()) {
content = createAttachmentBackfillResponseContent(message.getAttachmentBackfillResponse().get());
} else if (message.getAttachmentBackfillRequest().isPresent()) {
content = createAttachmentBackfillRequestContent(message.getAttachmentBackfillRequest().get());
urgent = true;
} else {
throw new IOException("Unsupported sync message!");
}
@@ -1783,6 +1786,13 @@ public class SignalServiceMessageSender {
return container.syncMessage(builder.build()).build();
}
private Content createAttachmentBackfillRequestContent(SyncMessage.AttachmentBackfillRequest proto) {
Content.Builder container = new Content.Builder();
SyncMessage.Builder builder = createSyncMessageBuilder().attachmentBackfillRequest(proto);
return container.syncMessage(builder.build()).build();
}
private SyncMessage.Builder createSyncMessageBuilder() {
byte[] padding = Util.getRandomLengthSecretBytes(512);
@@ -6,12 +6,12 @@
package org.whispersystems.signalservice.api.messages.multidevice;
import org.whispersystems.signalservice.internal.push.SyncMessage;
import org.whispersystems.signalservice.internal.push.SyncMessage.AttachmentBackfillRequest;
import org.whispersystems.signalservice.internal.push.SyncMessage.AttachmentBackfillResponse;
import org.whispersystems.signalservice.internal.push.SyncMessage.DeviceNameChange;
import org.whispersystems.signalservice.internal.push.SyncMessage.CallEvent;
import org.whispersystems.signalservice.internal.push.SyncMessage.CallLinkUpdate;
import org.whispersystems.signalservice.internal.push.SyncMessage.CallLogEvent;
import org.whispersystems.signalservice.internal.push.SyncMessage.DeviceNameChange;
import java.util.LinkedList;
import java.util.List;
@@ -40,6 +40,7 @@ public class SignalServiceSyncMessage {
private final Optional<CallLogEvent> callLogEvent;
private final Optional<DeviceNameChange> deviceNameChange;
private final Optional<AttachmentBackfillResponse> attachmentBackfillResponse;
private final Optional<AttachmentBackfillRequest> attachmentBackfillRequest;
private SignalServiceSyncMessage(Optional<SentTranscriptMessage> sent,
Optional<ContactsMessage> contacts,
@@ -59,7 +60,8 @@ public class SignalServiceSyncMessage {
Optional<CallLinkUpdate> callLinkUpdate,
Optional<CallLogEvent> callLogEvent,
Optional<DeviceNameChange> deviceNameChange,
Optional<AttachmentBackfillResponse> attachmentBackfillResponse)
Optional<AttachmentBackfillResponse> attachmentBackfillResponse,
Optional<AttachmentBackfillRequest> attachmentBackfillRequest)
{
this.sent = sent;
this.contacts = contacts;
@@ -80,6 +82,7 @@ public class SignalServiceSyncMessage {
this.callLogEvent = callLogEvent;
this.deviceNameChange = deviceNameChange;
this.attachmentBackfillResponse = attachmentBackfillResponse;
this.attachmentBackfillRequest = attachmentBackfillRequest;
}
public static SignalServiceSyncMessage forSentTranscript(SentTranscriptMessage sent) {
@@ -101,6 +104,7 @@ public class SignalServiceSyncMessage {
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty());
}
@@ -123,6 +127,7 @@ public class SignalServiceSyncMessage {
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty());
}
@@ -145,6 +150,7 @@ public class SignalServiceSyncMessage {
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty());
}
@@ -167,6 +173,7 @@ public class SignalServiceSyncMessage {
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty());
}
@@ -189,6 +196,7 @@ public class SignalServiceSyncMessage {
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty());
}
@@ -211,6 +219,7 @@ public class SignalServiceSyncMessage {
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty());
}
@@ -236,6 +245,7 @@ public class SignalServiceSyncMessage {
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty());
}
@@ -258,6 +268,7 @@ public class SignalServiceSyncMessage {
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty());
}
@@ -280,6 +291,7 @@ public class SignalServiceSyncMessage {
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty());
}
@@ -302,6 +314,7 @@ public class SignalServiceSyncMessage {
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty());
}
@@ -324,6 +337,7 @@ public class SignalServiceSyncMessage {
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty());
}
@@ -346,6 +360,7 @@ public class SignalServiceSyncMessage {
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty());
}
@@ -368,6 +383,7 @@ public class SignalServiceSyncMessage {
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty());
}
@@ -390,6 +406,7 @@ public class SignalServiceSyncMessage {
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty());
}
@@ -412,6 +429,7 @@ public class SignalServiceSyncMessage {
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty());
}
@@ -434,6 +452,7 @@ public class SignalServiceSyncMessage {
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty());
}
@@ -456,6 +475,7 @@ public class SignalServiceSyncMessage {
Optional.of(callLinkUpdate),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty());
}
@@ -478,6 +498,7 @@ public class SignalServiceSyncMessage {
Optional.empty(),
Optional.of(callLogEvent),
Optional.empty(),
Optional.empty(),
Optional.empty());
}
@@ -500,6 +521,7 @@ public class SignalServiceSyncMessage {
Optional.empty(),
Optional.empty(),
Optional.of(deviceNameChange),
Optional.empty(),
Optional.empty());
}
@@ -522,7 +544,31 @@ public class SignalServiceSyncMessage {
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.of(backfillResponse));
Optional.of(backfillResponse),
Optional.empty());
}
public static SignalServiceSyncMessage forAttachmentBackfillRequest(@Nonnull AttachmentBackfillRequest backfillRequest) {
return new SignalServiceSyncMessage(Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.of(backfillRequest));
}
public static SignalServiceSyncMessage empty() {
@@ -544,6 +590,7 @@ public class SignalServiceSyncMessage {
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty());
}
@@ -623,6 +670,10 @@ public class SignalServiceSyncMessage {
return attachmentBackfillResponse;
}
public Optional<AttachmentBackfillRequest> getAttachmentBackfillRequest() {
return attachmentBackfillRequest;
}
public enum FetchType {
LOCAL_PROFILE,
STORAGE_MANIFEST,