Make ending a poll a blocking job.

This commit is contained in:
Michelle Tang
2025-11-05 11:30:14 -05:00
parent 6a6b56e50c
commit a79a059816
11 changed files with 161 additions and 16 deletions

View File

@@ -22,6 +22,8 @@ import android.graphics.PorterDuffColorFilter
import android.graphics.Rect
import android.net.Uri
import android.os.Bundle
import android.os.Handler
import android.os.Looper
import android.provider.Browser
import android.provider.ContactsContract
import android.provider.Settings
@@ -134,6 +136,7 @@ import org.thoughtcrime.securesms.components.HidingLinearLayout
import org.thoughtcrime.securesms.components.InputAwareConstraintLayout
import org.thoughtcrime.securesms.components.InputPanel
import org.thoughtcrime.securesms.components.InsetAwareConstraintLayout
import org.thoughtcrime.securesms.components.ProgressCardDialogFragment
import org.thoughtcrime.securesms.components.ScrollToPositionDelegate
import org.thoughtcrime.securesms.components.SendButton
import org.thoughtcrime.securesms.components.ViewBinderDelegate
@@ -402,6 +405,8 @@ class ConversationFragment :
companion object {
private val TAG = Log.tag(ConversationFragment::class.java)
private val POLL_SPINNER_DELAY = 500.milliseconds
private const val ACTION_PINNED_SHORTCUT = "action_pinned_shortcut"
private const val SAVED_STATE_IS_SEARCH_REQUESTED = "is_search_requested"
private const val EMOJI_SEARCH_FRAGMENT_TAG = "EmojiSearchFragment"
@@ -525,6 +530,7 @@ class ConversationFragment :
private val doubleTapToEditDebouncer = DoubleClickDebouncer(200)
private val recentEmojis: RecentEmojiPageModel by lazy { RecentEmojiPageModel(AppDependencies.application, TextSecurePreferences.RECENT_STORAGE_KEY) }
private val nicknameEditActivityLauncher = registerForActivityResult(NicknameActivity.Contract()) {}
private val handler = Handler(Looper.getMainLooper())
private lateinit var layoutManager: ConversationLayoutManager
private lateinit var markReadHelper: MarkReadHelper
@@ -559,6 +565,7 @@ class ConversationFragment :
private var dataObserver: DataObserver? = null
private var menuProvider: ConversationOptionsMenu.Provider? = null
private var scrollListener: ScrollListener? = null
private var progressDialog: ProgressCardDialogFragment? = null
private val jumpAndPulseScrollStrategy = object : ScrollToPositionDelegate.ScrollStrategy {
override fun performScroll(recyclerView: RecyclerView, layoutManager: LinearLayoutManager, position: Int, smooth: Boolean) {
@@ -2670,6 +2677,13 @@ class ConversationFragment :
val endPoll = viewModel.endPoll(pollId)
disposables += endPoll
.doOnSubscribe {
handler.postDelayed({ showSpinner() }, POLL_SPINNER_DELAY.inWholeMilliseconds)
}
.doFinally {
handler.removeCallbacksAndMessages(null)
hideSpinner()
}
.subscribeBy(
onError = {
Log.w(TAG, "Error received during poll end!", it)
@@ -2685,6 +2699,16 @@ class ConversationFragment :
.show()
}
private fun showSpinner() {
progressDialog = ProgressCardDialogFragment.create(getString(R.string.Poll__waiting_for_network))
progressDialog?.show(parentFragmentManager, null)
}
private fun hideSpinner() {
progressDialog?.dismissAllowingStateLoss()
progressDialog = null
}
private inner class SwipeAvailabilityProvider : ConversationItemSwipeCallback.SwipeAvailabilityProvider {
override fun isSwipeAvailable(conversationMessage: ConversationMessage): Boolean {
val recipient = viewModel.recipientSnapshot ?: return false

View File

@@ -43,6 +43,7 @@ import org.thoughtcrime.securesms.conversation.mutiselect.MultiselectPart
import org.thoughtcrime.securesms.conversation.v2.RequestReviewState.GroupReviewState
import org.thoughtcrime.securesms.conversation.v2.RequestReviewState.IndividualReviewState
import org.thoughtcrime.securesms.conversation.v2.data.ConversationDataSource
import org.thoughtcrime.securesms.crypto.ProfileKeyUtil
import org.thoughtcrime.securesms.crypto.ReentrantSessionLock
import org.thoughtcrime.securesms.database.GroupTable
import org.thoughtcrime.securesms.database.IdentityTable.VerifiedStatus
@@ -64,10 +65,16 @@ import org.thoughtcrime.securesms.database.model.databaseprotos.BodyRangeList
import org.thoughtcrime.securesms.database.model.databaseprotos.MessageExtras
import org.thoughtcrime.securesms.database.model.databaseprotos.PollTerminate
import org.thoughtcrime.securesms.dependencies.AppDependencies
import org.thoughtcrime.securesms.dependencies.AppDependencies.databaseObserver
import org.thoughtcrime.securesms.dependencies.AppDependencies.expiringMessageManager
import org.thoughtcrime.securesms.groups.GroupNotAMemberException
import org.thoughtcrime.securesms.jobs.GroupSendJobHelper
import org.thoughtcrime.securesms.jobs.MultiDeviceViewOnceOpenJob
import org.thoughtcrime.securesms.keyboard.KeyboardUtil
import org.thoughtcrime.securesms.keyvalue.SignalStore.Companion.settings
import org.thoughtcrime.securesms.linkpreview.LinkPreview
import org.thoughtcrime.securesms.messagerequests.MessageRequestState
import org.thoughtcrime.securesms.messages.GroupSendUtil
import org.thoughtcrime.securesms.mms.OutgoingMessage
import org.thoughtcrime.securesms.mms.PartAuthority
import org.thoughtcrime.securesms.mms.QuoteModel
@@ -78,9 +85,12 @@ import org.thoughtcrime.securesms.profiles.spoofing.ReviewRecipient
import org.thoughtcrime.securesms.providers.BlobProvider
import org.thoughtcrime.securesms.recipients.Recipient
import org.thoughtcrime.securesms.recipients.RecipientId
import org.thoughtcrime.securesms.recipients.RecipientUtil
import org.thoughtcrime.securesms.sms.MessageSender
import org.thoughtcrime.securesms.sms.MessageSender.PreUploadResult
import org.thoughtcrime.securesms.transport.UndeliverableMessageException
import org.thoughtcrime.securesms.util.DrawableUtil
import org.thoughtcrime.securesms.util.GroupUtil
import org.thoughtcrime.securesms.util.MediaUtil
import org.thoughtcrime.securesms.util.MessageUtil
import org.thoughtcrime.securesms.util.SignalLocalMetrics
@@ -90,9 +100,15 @@ import org.thoughtcrime.securesms.util.hasSharedContact
import org.thoughtcrime.securesms.util.hasTextSlide
import org.thoughtcrime.securesms.util.isViewOnceMessage
import org.thoughtcrime.securesms.util.requireTextSlide
import org.whispersystems.signalservice.api.crypto.ContentHint
import org.whispersystems.signalservice.api.messages.SendMessageResult
import org.whispersystems.signalservice.api.messages.SignalServiceDataMessage
import org.whispersystems.signalservice.api.messages.SignalServiceDataMessage.Companion.newBuilder
import java.io.IOException
import kotlin.jvm.optionals.getOrNull
import kotlin.math.max
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds
class ConversationRepository(
@@ -102,6 +118,7 @@ class ConversationRepository(
companion object {
private val TAG = Log.tag(ConversationRepository::class.java)
private val POLL_TERMINATE_TIMEOUT = 6000.milliseconds
}
private val applicationContext = localContext.applicationContext
@@ -199,6 +216,12 @@ class ConversationRepository(
val threadRecipient = SignalDatabase.threads.getRecipientForThreadId(messageRecord.threadId)!!
val pollSentTimestamp = messageRecord.dateSent
if (threadRecipient.groupId.getOrNull()?.isV2 != true) {
Log.w(TAG, "Missing group id")
emitter.tryOnError(Exception("Poll terminate failed"))
}
val groupId = threadRecipient.requireGroupId().requireV2()
val message = OutgoingMessage.pollTerminateMessage(
threadRecipient = threadRecipient,
sentTimeMillis = System.currentTimeMillis(),
@@ -208,18 +231,87 @@ class ConversationRepository(
Log.i(TAG, "Sending poll terminate to " + message.threadRecipient.id + ", thread: " + messageRecord.threadId)
MessageSender.sendPollAction(
AppDependencies.application,
message,
messageRecord.threadId,
MessageSender.SendType.SIGNAL,
null
) {
val possibleTargets: List<Recipient> = SignalDatabase.groups.getGroupMembers(groupId, GroupTable.MemberSet.FULL_MEMBERS_EXCLUDING_SELF)
.map { it.resolve() }
.distinctBy { it.id }
val eligibleTargets: List<Recipient> = RecipientUtil.getEligibleForSending(possibleTargets)
val results = sendEndPoll(threadRecipient, message, eligibleTargets)
val sendResults = GroupSendJobHelper.getCompletedSends(eligibleTargets, results)
if (sendResults.completed.isNotEmpty()) {
val allocatedThreadId = SignalDatabase.threads.getOrCreateValidThreadId(threadRecipient, messageRecord.threadId, message.distributionType)
val outgoingMessage = applyUniversalExpireTimerIfNecessary(applicationContext, threadRecipient, message, allocatedThreadId)
val insertResult = SignalDatabase.messages.insertMessageOutbox(outgoingMessage, allocatedThreadId, false, null)
val messageId = insertResult.messageId
SignalDatabase.threads.update(threadId = allocatedThreadId, unarchive = true, syncThreadDelete = true)
databaseObserver.notifyMessageUpdateObservers(MessageId(poll.messageId))
databaseObserver.notifyMessageInsertObservers(messageRecord.threadId, MessageId(messageId))
if (outgoingMessage.expiresIn > 0) {
SignalDatabase.messages.markExpireStarted(messageId)
expiringMessageManager.scheduleDeletion(messageId, true, message.expiresIn)
}
if (sendResults.skipped.isNotEmpty()) {
val messageRecord = SignalDatabase.messages.getMessageRecord(messageId)
val filterRecipientIds = (sendResults.skipped - sendResults.completed.map { it.id }).toSet()
Log.i(TAG, "Some recipients skipped when sending end poll. Resending to $filterRecipientIds")
MessageSender.resendGroupMessage(applicationContext, messageRecord, filterRecipientIds)
} else {
SignalDatabase.messages.markAsSent(messageId, true)
}
emitter.onComplete()
} else {
emitter.tryOnError(Exception("Poll terminate failed"))
}
}.subscribeOn(Schedulers.io())
}
@Throws(IOException::class, GroupNotAMemberException::class, UndeliverableMessageException::class)
fun sendEndPoll(group: Recipient, message: OutgoingMessage, destinations: List<Recipient>): List<SendMessageResult?> {
val groupId = group.requireGroupId().requireV2()
val groupRecord: GroupRecord? = SignalDatabase.groups.getGroup(group.requireGroupId()).getOrNull()
if (groupRecord != null && groupRecord.isAnnouncementGroup && !groupRecord.isAdmin(Recipient.self())) {
throw UndeliverableMessageException("Non-admins cannot send messages in announcement groups!")
}
val builder = newBuilder()
GroupUtil.setDataMessageGroupContext(AppDependencies.application, builder, groupId)
val sentTime = System.currentTimeMillis()
val groupMessage = builder
.withTimestamp(sentTime)
.withExpiration((message.expiresIn / 1000).toInt())
.withProfileKey(ProfileKeyUtil.getSelfProfileKey().serialize())
.withPollTerminate(SignalServiceDataMessage.PollTerminate(message.messageExtras!!.pollTerminate!!.targetTimestamp))
.build()
return GroupSendUtil.sendUnresendableDataMessage(
applicationContext,
groupId,
destinations,
false,
ContentHint.DEFAULT,
groupMessage,
false
) { System.currentTimeMillis() - sentTime > POLL_TERMINATE_TIMEOUT.inWholeMilliseconds }
}
private fun applyUniversalExpireTimerIfNecessary(context: Context, recipient: Recipient, outgoingMessage: OutgoingMessage, threadId: Long): OutgoingMessage {
if (!outgoingMessage.isExpirationUpdate && outgoingMessage.expiresIn == 0L) {
val expireTimerVersion = RecipientUtil.setAndSendUniversalExpireTimerIfNecessary(context, recipient, threadId)
if (expireTimerVersion != null) {
return outgoingMessage.withExpiry(settings.universalExpireTimer.seconds.inWholeMilliseconds, expireTimerVersion)
}
}
return outgoingMessage
}
fun sendMessage(
threadId: Long,
threadRecipient: Recipient,

View File

@@ -2159,6 +2159,12 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat
}
fun markAsSentFailed(messageId: Long) {
// When a poll terminate fails, we ignore attempts to mark it as failed because we know that it was previously successfully sent to at least one person
val messageType = getMessageType(messageId)
if (MessageTypes.isPollTerminate(messageType)) {
Log.i(TAG, "Ignoring sent failed for poll terminate $messageId")
return
}
val threadId = getThreadIdForMessage(messageId)
updateMailboxBitmask(messageId, MessageTypes.BASE_TYPE_MASK, MessageTypes.BASE_SENT_FAILED_TYPE, Optional.of(threadId))
AppDependencies.databaseObserver.notifyMessageUpdateObservers(MessageId(messageId))
@@ -3715,6 +3721,15 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat
return MessageTypes.isSentType(type)
}
fun getMessageType(messageId: Long): Long {
return readableDatabase
.select(TYPE)
.from(TABLE_NAME)
.where("$ID = ?", messageId)
.run()
.readToSingleLong()
}
fun getProfileChangeDetailsRecords(threadId: Long, afterTimestamp: Long): List<MessageRecord> {
val cursor = readableDatabase
.select(*MMS_PROJECTION)

View File

@@ -173,7 +173,8 @@ public class GroupCallUpdateSendJob extends BaseJob {
false,
ContentHint.DEFAULT,
dataMessage,
false);
false,
null);
if (includesSelf) {
results.add(AppDependencies.getSignalServiceMessageSender().sendSyncMessage(dataMessage));

View File

@@ -12,14 +12,14 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
final class GroupSendJobHelper {
public final class GroupSendJobHelper {
private static final String TAG = Log.tag(GroupSendJobHelper.class);
private GroupSendJobHelper() {
}
static @NonNull SendResult getCompletedSends(@NonNull List<Recipient> possibleRecipients, @NonNull Collection<SendMessageResult> results) {
public static @NonNull SendResult getCompletedSends(@NonNull List<Recipient> possibleRecipients, @NonNull Collection<SendMessageResult> results) {
RecipientAccessList accessList = new RecipientAccessList(possibleRecipients);
List<Recipient> completions = new ArrayList<>(results.size());
List<RecipientId> skipped = new ArrayList<>();
@@ -48,6 +48,11 @@ final class GroupSendJobHelper {
skipped.add(recipient.getId());
}
if (sendMessageResult.isCanceledFailure()) {
Log.w(TAG, "Canceled result " + recipient.getId());
skipped.add(recipient.getId());
}
if (sendMessageResult.getSuccess() != null ||
sendMessageResult.getIdentityFailure() != null ||
sendMessageResult.getProofRequiredFailure() != null ||

View File

@@ -185,7 +185,7 @@ public class ProfileKeySendJob extends BaseJob {
.withTimestamp(System.currentTimeMillis())
.withProfileKey(Recipient.self().resolve().getProfileKey());
List<SendMessageResult> results = GroupSendUtil.sendUnresendableDataMessage(context, null, destinations, false, ContentHint.IMPLICIT, dataMessage.build(), false);
List<SendMessageResult> results = GroupSendUtil.sendUnresendableDataMessage(context, null, destinations, false, ContentHint.IMPLICIT, dataMessage.build(), false, null);
ProofRequiredException proofRequired = Stream.of(results).filter(r -> r.getProofRequiredFailure() != null).findLast().map(SendMessageResult::getProofRequiredFailure).orElse(null);
GroupSendJobHelper.SendResult groupResult = GroupSendJobHelper.getCompletedSends(destinations, results);

View File

@@ -180,7 +180,7 @@ public final class PushGroupSilentUpdateSendJob extends BaseJob {
.asGroupMessage(group)
.build();
List<SendMessageResult> results = GroupSendUtil.sendUnresendableDataMessage(context, groupId, destinations, false, ContentHint.IMPLICIT, groupDataMessage, false);
List<SendMessageResult> results = GroupSendUtil.sendUnresendableDataMessage(context, groupId, destinations, false, ContentHint.IMPLICIT, groupDataMessage, false, null);
GroupSendJobHelper.SendResult groupResult = GroupSendJobHelper.getCompletedSends(destinations, results);

View File

@@ -132,10 +132,11 @@ public final class GroupSendUtil {
boolean isRecipientUpdate,
ContentHint contentHint,
@NonNull SignalServiceDataMessage message,
boolean urgent)
boolean urgent,
CancelationSignal cancelationSignal)
throws IOException, UntrustedIdentityException
{
return sendMessage(context, groupId, getDistributionId(groupId), null, allTargets, isRecipientUpdate, false, DataSendOperation.unresendable(message, contentHint, urgent), null);
return sendMessage(context, groupId, getDistributionId(groupId), null, allTargets, isRecipientUpdate, false, DataSendOperation.unresendable(message, contentHint, urgent), cancelationSignal);
}
/**
@@ -392,6 +393,11 @@ public final class GroupSendUtil {
final AtomicLong entryId = new AtomicLong(-1);
final boolean includeInMessageLog = sendOperation.shouldIncludeInMessageLog();
if (cancelationSignal != null && cancelationSignal.isCanceled()) {
Log.i(TAG, "Send canceled before any sends took place. Returning an empty list.");
return Collections.emptyList();
}
List<SendMessageResult> results = sendOperation.sendWithSenderKey(messageSender, distributionId, targets, access, groupSendEndorsements, isRecipientUpdate, partialResults -> {
if (!includeInMessageLog) {
return;

View File

@@ -37,7 +37,6 @@ import org.thoughtcrime.securesms.database.AttachmentTable;
import org.thoughtcrime.securesms.database.MessageTable;
import org.thoughtcrime.securesms.database.MessageTable.InsertResult;
import org.thoughtcrime.securesms.database.NoSuchMessageException;
import org.thoughtcrime.securesms.database.PollTables;
import org.thoughtcrime.securesms.database.RecipientTable;
import org.thoughtcrime.securesms.database.SignalDatabase;
import org.thoughtcrime.securesms.database.ThreadTable;
@@ -63,7 +62,6 @@ import org.thoughtcrime.securesms.keyvalue.SignalStore;
import org.thoughtcrime.securesms.mediasend.Media;
import org.thoughtcrime.securesms.mms.MmsException;
import org.thoughtcrime.securesms.mms.OutgoingMessage;
import org.thoughtcrime.securesms.mms.QuoteModel;
import org.thoughtcrime.securesms.recipients.Recipient;
import org.thoughtcrime.securesms.recipients.RecipientId;
import org.thoughtcrime.securesms.recipients.RecipientUtil;
@@ -80,6 +78,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;