From 142979ce9331560d9bc37458651d6d8a3b08fead Mon Sep 17 00:00:00 2001 From: Greyson Parrelli Date: Mon, 21 Mar 2022 18:40:17 -0400 Subject: [PATCH] Add job to clean up early message receipts. --- .../database/model/ServiceMessageId.kt | 16 +++ .../securesms/jobmanager/Job.java | 2 +- .../securesms/jobmanager/JobController.java | 8 ++ .../securesms/jobmanager/JobManager.java | 11 +++ .../securesms/jobs/JobManagerFactories.java | 1 + .../jobs/PushProcessEarlyMessagesJob.kt | 98 +++++++++++++++++++ .../messages/MessageContentProcessor.java | 15 +++ .../securesms/util/EarlyMessageCache.java | 43 +++----- 8 files changed, 166 insertions(+), 28 deletions(-) create mode 100644 app/src/main/java/org/thoughtcrime/securesms/database/model/ServiceMessageId.kt create mode 100644 app/src/main/java/org/thoughtcrime/securesms/jobs/PushProcessEarlyMessagesJob.kt diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/model/ServiceMessageId.kt b/app/src/main/java/org/thoughtcrime/securesms/database/model/ServiceMessageId.kt new file mode 100644 index 0000000000..d9a73a1d09 --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/database/model/ServiceMessageId.kt @@ -0,0 +1,16 @@ +package org.thoughtcrime.securesms.database.model + +import org.thoughtcrime.securesms.recipients.RecipientId + +/** + * Represents the messages "ID" from the service's perspective, which identifies messages via a + * a (sender, timestamp) pair. + */ +data class ServiceMessageId( + val sender: RecipientId, + val sentTimestamp: Long +) { + override fun toString(): String { + return "MessageId($sender, $sentTimestamp)" + } +} diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/Job.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/Job.java index 56ca66df73..eb292e0c58 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/Job.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/Job.java @@ -242,7 +242,7 @@ public abstract class Job { public static final class Parameters { public static final String MIGRATION_QUEUE_KEY = "MIGRATION"; - public static final int IMMORTAL = -1; + public static final long IMMORTAL = -1; public static final int UNLIMITED = -1; private final String id; diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java index bee8e1f10f..dede570bf7 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java @@ -26,6 +26,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Predicate; /** * Manages the queue of jobs. This is the only class that should write to {@link JobStorage} to @@ -190,6 +191,13 @@ class JobController { notifyAll(); } + @WorkerThread + synchronized List findJobs(@NonNull Predicate predicate) { + return Stream.of(jobStorage.getAllJobSpecs()) + .filter(predicate::test) + .toList(); + } + @WorkerThread synchronized void onRetry(@NonNull Job job, long backoffInterval) { if (backoffInterval <= 0) { diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java index cc8e11d1fd..bc49d7bafe 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java @@ -13,6 +13,7 @@ import org.signal.core.util.ThreadUtil; import org.signal.core.util.logging.Log; import org.thoughtcrime.securesms.jobmanager.impl.DefaultExecutorFactory; import org.thoughtcrime.securesms.jobmanager.impl.JsonDataSerializer; +import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec; import org.thoughtcrime.securesms.jobmanager.persistence.JobStorage; import org.thoughtcrime.securesms.util.Debouncer; import org.thoughtcrime.securesms.util.TextSecurePreferences; @@ -33,6 +34,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; /** * Allows the scheduling of durable jobs that will be run as early as possible. @@ -234,6 +236,15 @@ public class JobManager implements ConstraintObserver.Notifier { runOnExecutor(() -> jobController.update(updater)); } + /** + * Search through the list of pending jobs and find all that match a given predicate. Note that there will always be races here, and the result you get back + * may not be valid anymore by the time you get it. Use with caution. + */ + public @NonNull List find(@NonNull Predicate predicate) { + waitUntilInitialized(); + return jobController.findJobs(predicate); + } + /** * Runs the specified job synchronously. Beware: All normal dependencies are respected, meaning * you must take great care where you call this. It could take a very long time to complete! diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java index c7eeb4dd75..90936c3bfd 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java @@ -140,6 +140,7 @@ public final class JobManagerFactories { put(PushGroupUpdateJob.KEY, new PushGroupUpdateJob.Factory()); put(PushMediaSendJob.KEY, new PushMediaSendJob.Factory()); put(PushNotificationReceiveJob.KEY, new PushNotificationReceiveJob.Factory()); + put(PushProcessEarlyMessagesJob.KEY, new PushProcessEarlyMessagesJob.Factory()); put(PushProcessMessageJob.KEY, new PushProcessMessageJob.Factory()); put(PushTextSendJob.KEY, new PushTextSendJob.Factory()); put(ReactionSendJob.KEY, new ReactionSendJob.Factory()); diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/PushProcessEarlyMessagesJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/PushProcessEarlyMessagesJob.kt new file mode 100644 index 0000000000..72d043b88c --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/PushProcessEarlyMessagesJob.kt @@ -0,0 +1,98 @@ +package org.thoughtcrime.securesms.jobs + +import org.signal.core.util.logging.Log +import org.thoughtcrime.securesms.database.SignalDatabase +import org.thoughtcrime.securesms.database.model.ServiceMessageId +import org.thoughtcrime.securesms.dependencies.ApplicationDependencies +import org.thoughtcrime.securesms.jobmanager.Data +import org.thoughtcrime.securesms.jobmanager.Job +import org.thoughtcrime.securesms.messages.MessageContentProcessor +import org.whispersystems.signalservice.api.messages.SignalServiceContent +import java.lang.Exception +import java.util.Optional + +/** + * A job that should be enqueued whenever we process a message that we think has arrived "early" (see [org.thoughtcrime.securesms.util.EarlyMessageCache]). + * It will go through and process all of those early messages (if we have found a "match"), ordered by sentTimestamp. + */ +class PushProcessEarlyMessagesJob private constructor(parameters: Parameters) : BaseJob(parameters) { + + private constructor() : + this( + Parameters.Builder() + .setMaxInstancesForFactory(2) + .setMaxAttempts(1) + .setLifespan(Parameters.IMMORTAL) + .build() + ) + + override fun getFactoryKey(): String { + return KEY + } + + override fun serialize(): Data { + return Data.EMPTY + } + + override fun onRun() { + val earlyIds: List = ApplicationDependencies.getEarlyMessageCache().allReferencedIds + .filter { SignalDatabase.mmsSms.getMessageFor(it.sentTimestamp, it.sender) != null } + .sortedBy { it.sentTimestamp } + + if (earlyIds.isNotEmpty()) { + Log.i(TAG, "There are ${earlyIds.size} items in the early message cache with matches.") + + for (id: ServiceMessageId in earlyIds) { + val contents: Optional> = ApplicationDependencies.getEarlyMessageCache().retrieve(id.sender, id.sentTimestamp) + + if (contents.isPresent) { + for (content: SignalServiceContent in contents.get()) { + Log.i(TAG, "[${id.sentTimestamp}] Processing early content for $id") + MessageContentProcessor(context).process(MessageContentProcessor.MessageState.DECRYPTED_OK, content, null, id.sentTimestamp, -1) + } + } else { + Log.w(TAG, "[${id.sentTimestamp}] Saw $id in the cache, but when we went to retrieve it, it was already gone.") + } + } + } else { + Log.i(TAG, "There are no items in the early message cache with matches.") + } + } + + override fun onShouldRetry(e: Exception): Boolean { + return false + } + + override fun onFailure() { + } + + class Factory : Job.Factory { + override fun create(parameters: Parameters, data: Data): PushProcessEarlyMessagesJob { + return PushProcessEarlyMessagesJob(parameters) + } + } + + companion object { + private val TAG = Log.tag(PushProcessEarlyMessagesJob::class.java) + + const val KEY = "PushProcessEarlyMessageJob" + + /** + * Enqueues a job to run after the most-recently-enqueued [PushProcessMessageJob]. + */ + @JvmStatic + fun enqueue() { + val jobManger = ApplicationDependencies.getJobManager() + + val youngestProcessJobId: String? = jobManger.find { it.factoryKey == PushProcessMessageJob.KEY } + .maxByOrNull { it.createTime } + ?.id + + if (youngestProcessJobId != null) { + jobManger.add(PushProcessEarlyMessagesJob(), listOf(youngestProcessJobId)) + } else { + jobManger.add(PushProcessEarlyMessagesJob()) + } + } + } +} diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/MessageContentProcessor.java b/app/src/main/java/org/thoughtcrime/securesms/messages/MessageContentProcessor.java index 68e1e9472c..80bf4e2784 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/messages/MessageContentProcessor.java +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/MessageContentProcessor.java @@ -79,6 +79,7 @@ import org.thoughtcrime.securesms.jobs.NullMessageSendJob; import org.thoughtcrime.securesms.jobs.PaymentLedgerUpdateJob; import org.thoughtcrime.securesms.jobs.PaymentTransactionCheckJob; import org.thoughtcrime.securesms.jobs.ProfileKeySendJob; +import org.thoughtcrime.securesms.jobs.PushProcessEarlyMessagesJob; import org.thoughtcrime.securesms.jobs.PushProcessMessageJob; import org.thoughtcrime.securesms.jobs.RefreshAttributesJob; import org.thoughtcrime.securesms.jobs.RefreshOwnProfileJob; @@ -896,6 +897,7 @@ public final class MessageContentProcessor { if (targetMessage == null) { warn(String.valueOf(content.getTimestamp()), "[handleReaction] Could not find matching message! Putting it in the early message cache. timestamp: " + reaction.getTargetSentTimestamp() + " author: " + targetAuthor.getId()); ApplicationDependencies.getEarlyMessageCache().store(targetAuthor.getId(), reaction.getTargetSentTimestamp(), content); + PushProcessEarlyMessagesJob.enqueue(); return null; } @@ -952,6 +954,7 @@ public final class MessageContentProcessor { } else if (targetMessage == null) { warn(String.valueOf(content.getTimestamp()), "[handleRemoteDelete] Could not find matching message! timestamp: " + delete.getTargetSentTimestamp() + " author: " + senderRecipient.getId()); ApplicationDependencies.getEarlyMessageCache().store(senderRecipient.getId(), delete.getTargetSentTimestamp(), content); + PushProcessEarlyMessagesJob.enqueue(); return null; } else { warn(String.valueOf(content.getTimestamp()), String.format(Locale.ENGLISH, "[handleRemoteDelete] Invalid remote delete! deleteTime: %d, targetTime: %d, deleteAuthor: %s, targetAuthor: %s", @@ -2133,6 +2136,10 @@ public final class MessageContentProcessor { warn(String.valueOf(content.getTimestamp()), "[handleViewedReceipt] Could not find matching message! timestamp: " + id.getTimetamp() + " author: " + senderRecipient.getId()); ApplicationDependencies.getEarlyMessageCache().store(senderRecipient.getId(), id.getTimetamp(), content); } + + if (unhandled.size() > 0) { + PushProcessEarlyMessagesJob.enqueue(); + } } @SuppressLint("DefaultLocale") @@ -2153,6 +2160,10 @@ public final class MessageContentProcessor { // Early delivery receipts are special-cased in the database methods } + if (unhandled.size() > 0) { + PushProcessEarlyMessagesJob.enqueue(); + } + SignalDatabase.messageLog().deleteEntriesForRecipient(message.getTimestamps(), senderRecipient.getId(), content.getSenderDevice()); } @@ -2178,6 +2189,10 @@ public final class MessageContentProcessor { warn(String.valueOf(content.getTimestamp()), "[handleReadReceipt] Could not find matching message! timestamp: " + id.getTimetamp() + " author: " + senderRecipient.getId()); ApplicationDependencies.getEarlyMessageCache().store(senderRecipient.getId(), id.getTimetamp(), content); } + + if (unhandled.size() > 0) { + PushProcessEarlyMessagesJob.enqueue(); + } } private void handleTypingMessage(@NonNull SignalServiceContent content, diff --git a/app/src/main/java/org/thoughtcrime/securesms/util/EarlyMessageCache.java b/app/src/main/java/org/thoughtcrime/securesms/util/EarlyMessageCache.java index eb1ae5ac0a..317b659280 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/util/EarlyMessageCache.java +++ b/app/src/main/java/org/thoughtcrime/securesms/util/EarlyMessageCache.java @@ -2,9 +2,14 @@ package org.thoughtcrime.securesms.util; import androidx.annotation.NonNull; +import org.thoughtcrime.securesms.database.MessageDatabase; +import org.thoughtcrime.securesms.database.model.MessageId; +import org.thoughtcrime.securesms.database.model.ServiceMessageId; import org.thoughtcrime.securesms.recipients.RecipientId; import org.whispersystems.signalservice.api.messages.SignalServiceContent; +import java.util.Collection; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Objects; @@ -17,14 +22,14 @@ import java.util.Optional; */ public final class EarlyMessageCache { - private final LRUCache> cache = new LRUCache<>(100); + private final LRUCache> cache = new LRUCache<>(100); /** * @param targetSender The sender of the message this message depends on. * @param targetSentTimestamp The sent timestamp of the message this message depends on. */ - public void store(@NonNull RecipientId targetSender, long targetSentTimestamp, @NonNull SignalServiceContent content) { - MessageId messageId = new MessageId(targetSender, targetSentTimestamp); + public synchronized void store(@NonNull RecipientId targetSender, long targetSentTimestamp, @NonNull SignalServiceContent content) { + ServiceMessageId messageId = new ServiceMessageId(targetSender, targetSentTimestamp); List contentList = cache.get(messageId); if (contentList == null) { @@ -41,31 +46,15 @@ public final class EarlyMessageCache { * @param sender The sender of the message in question. * @param sentTimestamp The sent timestamp of the message in question. */ - public Optional> retrieve(@NonNull RecipientId sender, long sentTimestamp) { - return Optional.ofNullable(cache.remove(new MessageId(sender, sentTimestamp))); + public synchronized Optional> retrieve(@NonNull RecipientId sender, long sentTimestamp) { + return Optional.ofNullable(cache.remove(new ServiceMessageId(sender, sentTimestamp))); } - private static final class MessageId { - private final RecipientId sender; - private final long sentTimestamp; - - private MessageId(@NonNull RecipientId sender, long sentTimestamp) { - this.sender = sender; - this.sentTimestamp = sentTimestamp; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - MessageId messageId = (MessageId) o; - return sentTimestamp == messageId.sentTimestamp && - Objects.equals(sender, messageId.sender); - } - - @Override - public int hashCode() { - return Objects.hash(sentTimestamp, sender); - } + /** + * Returns a collection of all of the {@link ServiceMessageId}s referenced in the cache at the moment of inquiry. + * Caution: There is no guarantee that this list will be relevant for any amount of time afterwards. + */ + public synchronized @NonNull Collection getAllReferencedIds() { + return new HashSet<>(cache.keySet()); } }