Attempt to resend pending messages if job creation failed.

This commit is contained in:
Cody Henthorne
2025-06-30 16:11:22 -04:00
committed by Greyson Parrelli
parent 173983a1ab
commit f540886bb8
7 changed files with 166 additions and 11 deletions

View File

@@ -62,6 +62,7 @@ import org.thoughtcrime.securesms.jobs.CheckServiceReachabilityJob;
import org.thoughtcrime.securesms.jobs.DownloadLatestEmojiDataJob;
import org.thoughtcrime.securesms.jobs.EmojiSearchIndexDownloadJob;
import org.thoughtcrime.securesms.jobs.FcmRefreshJob;
import org.thoughtcrime.securesms.jobs.RetryPendingSendsJob;
import org.thoughtcrime.securesms.jobs.FontDownloaderJob;
import org.thoughtcrime.securesms.jobs.GroupRingCleanupJob;
import org.thoughtcrime.securesms.jobs.GroupV2UpdateSelfProfileKeyJob;
@@ -110,7 +111,6 @@ import org.thoughtcrime.securesms.util.TextSecurePreferences;
import org.thoughtcrime.securesms.util.Util;
import org.thoughtcrime.securesms.util.VersionTracker;
import org.thoughtcrime.securesms.util.dynamiclanguage.DynamicLanguageContextWrapper;
import org.whispersystems.signalservice.api.backup.MediaName;
import org.whispersystems.signalservice.api.websocket.SignalWebSocket;
import java.io.InterruptedIOException;
@@ -229,6 +229,7 @@ public class ApplicationContext extends Application implements AppForegroundObse
.addPostRender(() -> ActiveCallManager.clearNotifications(this))
.addPostRender(() -> GroupSendEndorsementInternalNotifier.init())
.addPostRender(RestoreOptimizedMediaJob::enqueueIfNecessary)
.addPostRender(RetryPendingSendsJob::enqueueForAll)
.execute();
Log.d(TAG, "onCreate() took " + (System.currentTimeMillis() - startTime) + " ms");

View File

@@ -146,6 +146,8 @@ import java.util.UUID
import java.util.function.Function
import kotlin.math.max
import kotlin.math.min
import kotlin.time.Duration.Companion.days
import kotlin.time.Duration.Companion.milliseconds
open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : DatabaseTable(context, databaseHelper), MessageTypes, RecipientIdDatabaseReference, ThreadIdDatabaseReference {
@@ -1997,6 +1999,19 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat
}
}
fun getRecentPendingMessages(): MmsReader {
val now = System.currentTimeMillis()
val oneDayAgo = now.milliseconds - 1.days
val messageIds = readableDatabase
.select(ID)
.from(TABLE_NAME)
.where("$DATE_SENT > ${oneDayAgo.inWholeMilliseconds} AND $DATE_SENT < $now AND ($TYPE & ${MessageTypes.BASE_TYPE_MASK}) = ${MessageTypes.BASE_SENDING_TYPE} AND $SCHEDULED_DATE = -1")
.run()
.readToList { it.requireLong(ID) }
return getMessages(messageIds)
}
private fun getOriginalEditedMessageRecord(messageId: Long): Long {
return readableDatabase.select(ID)
.from(TABLE_NAME)
@@ -5304,15 +5319,6 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat
val type: Long
)
enum class MessageStatus {
PENDING,
SENT,
DELIVERED,
READ,
VIEWED,
FAILED
}
data class SyncMessageId(
val recipientId: RecipientId,
val timetamp: Long

View File

@@ -256,6 +256,8 @@ public final class JobManagerFactories {
put(SyncSystemContactLinksJob.KEY, new SyncSystemContactLinksJob.Factory());
put(MultiDeviceStorySendSyncJob.KEY, new MultiDeviceStorySendSyncJob.Factory());
put(ResetSvrGuessCountJob.KEY, new ResetSvrGuessCountJob.Factory());
put(RetryPendingSendsJob.KEY, new RetryPendingSendsJob.Factory());
put(RetryPendingSendSecondCheckJob.KEY, new RetryPendingSendSecondCheckJob.Factory());
put(ServiceOutageDetectionJob.KEY, new ServiceOutageDetectionJob.Factory());
put(StickerDownloadJob.KEY, new StickerDownloadJob.Factory());
put(StickerPackDownloadJob.KEY, new StickerPackDownloadJob.Factory());

View File

@@ -0,0 +1,66 @@
/*
* Copyright 2025 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.jobs
import org.signal.core.util.logging.Log
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.database.model.MessageId
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobs.protos.SecondRoundFixupSendJobData
import org.thoughtcrime.securesms.recipients.Recipient
import org.thoughtcrime.securesms.sms.MessageSender
import kotlin.time.Duration.Companion.days
/**
* Only enqueued by [RetryPendingSendsJob] for messages that are pending on app launch. When this job runs, if a message is still pending
* then no send job happened between app launch and draining of that thread's message send job queue. Thus it should be safe
* to try to resend the message.
*
* Note job is in-memory only to prevent multiple instances being enqueued per message.
*/
class RetryPendingSendSecondCheckJob private constructor(parameters: Parameters, private val messageId: MessageId) : Job(parameters) {
companion object {
const val KEY = "RetryPendingSendSecondCheckJob"
private val TAG = Log.tag(RetryPendingSendSecondCheckJob::class)
}
constructor(messageId: MessageId, threadRecipient: Recipient, hasMedia: Boolean) : this(
parameters = Parameters.Builder()
.setQueue(threadRecipient.id.toQueueKey(hasMedia))
.setLifespan(1.days.inWholeMilliseconds)
.setMemoryOnly(true)
.build(),
messageId = messageId
)
override fun serialize(): ByteArray? = SecondRoundFixupSendJobData(messageId = messageId.id).encode()
override fun getFactoryKey(): String = KEY
override fun run(): Result {
val messageRecord = SignalDatabase.messages.getMessageRecord(messageId.id)
if (!messageRecord.isPending) {
return Result.success()
}
Log.w(TAG, "[${messageRecord.dateSent}] Still pending after queue drain, re-sending MessageId::${messageId.id}!")
MessageSender.resend(context, messageRecord)
return Result.success()
}
override fun onFailure() = Unit
class Factory : Job.Factory<RetryPendingSendSecondCheckJob> {
override fun create(parameters: Parameters, serializedData: ByteArray?): RetryPendingSendSecondCheckJob {
val data = SecondRoundFixupSendJobData.ADAPTER.decode(serializedData!!)
return RetryPendingSendSecondCheckJob(parameters, MessageId(data.messageId))
}
}
}

View File

@@ -0,0 +1,72 @@
/*
* Copyright 2025 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.jobs
import org.signal.core.util.logging.Log
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.database.model.MessageId
import org.thoughtcrime.securesms.database.model.MmsMessageRecord
import org.thoughtcrime.securesms.dependencies.AppDependencies
import org.thoughtcrime.securesms.jobmanager.Job
import kotlin.time.Duration.Companion.days
/**
* It's possible for a message to be inserted into the database but the job to send it to be lost. This
* job tries to catch those rare situations by finding any pending messages and enqueue a second check
* at the end of the thread's message sending queue.
*
* If that second job still sees the message as pending, then it will try to resend it.
*
* Note job is in-memory only as it should only run once on app launch and the job will always
* be last in the overall queue at time of submission.
*/
class RetryPendingSendsJob private constructor(parameters: Parameters) : Job(parameters) {
companion object {
const val KEY = "RetryPendingSendsJob"
private val TAG = Log.tag(RetryPendingSendsJob::class)
@JvmStatic
fun enqueueForAll() {
AppDependencies.jobManager.add(RetryPendingSendsJob())
}
}
private constructor() : this(
Parameters.Builder()
.setLifespan(1.days.inWholeMilliseconds)
.setMaxInstancesForFactory(1)
.setMemoryOnly(true)
.build()
)
override fun serialize(): ByteArray? = null
override fun getFactoryKey(): String = KEY
override fun run(): Result {
SignalDatabase.messages.getRecentPendingMessages().use { reader ->
reader.forEach { message ->
val threadRecipient = SignalDatabase.threads.getRecipientForThreadId(message.threadId)
if (threadRecipient != null) {
val hasMedia = (message as? MmsMessageRecord)?.slideDeck?.slides?.isNotEmpty() == true
Log.d(TAG, "[${message.dateSent}] Found pending message MessageId::${message.id}, enqueueing second check job")
AppDependencies.jobManager.add(RetryPendingSendSecondCheckJob(MessageId(message.id), threadRecipient, hasMedia))
}
}
}
return Result.success()
}
override fun onFailure() = Unit
class Factory : Job.Factory<RetryPendingSendsJob> {
override fun create(parameters: Parameters, serializedData: ByteArray?): RetryPendingSendsJob {
return RetryPendingSendsJob(parameters)
}
}
}

View File

@@ -182,6 +182,10 @@ final class MessageHeaderViewHolder extends RecyclerView.ViewHolder implements G
if (messageRecord.isPending() || messageRecord.isFailed()) {
sentDate.setText(formatBoldString(R.string.message_details_header_sent, "-"));
sentDate.setOnLongClickListener(v -> {
copyToClipboard(String.valueOf(messageRecord.getDateSent()));
return true;
});
receivedDate.setVisibility(View.GONE);
} else {
Locale dateLocale = Locale.getDefault();

View File

@@ -238,4 +238,8 @@ message BackupDeleteJobData {
repeated Stage completed = 1;
Tier tier = 2;
}
}
message SecondRoundFixupSendJobData {
uint64 messageId = 1;
}