Add job to clean up early message receipts.

This commit is contained in:
Greyson Parrelli
2022-03-21 18:40:17 -04:00
parent 093dd7c62c
commit 142979ce93
8 changed files with 166 additions and 28 deletions

View File

@@ -0,0 +1,16 @@
package org.thoughtcrime.securesms.database.model
import org.thoughtcrime.securesms.recipients.RecipientId
/**
* Represents the messages "ID" from the service's perspective, which identifies messages via a
* a (sender, timestamp) pair.
*/
data class ServiceMessageId(
val sender: RecipientId,
val sentTimestamp: Long
) {
override fun toString(): String {
return "MessageId($sender, $sentTimestamp)"
}
}

View File

@@ -242,7 +242,7 @@ public abstract class Job {
public static final class Parameters {
public static final String MIGRATION_QUEUE_KEY = "MIGRATION";
public static final int IMMORTAL = -1;
public static final long IMMORTAL = -1;
public static final int UNLIMITED = -1;
private final String id;

View File

@@ -26,6 +26,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
/**
* Manages the queue of jobs. This is the only class that should write to {@link JobStorage} to
@@ -190,6 +191,13 @@ class JobController {
notifyAll();
}
@WorkerThread
synchronized List<JobSpec> findJobs(@NonNull Predicate<JobSpec> predicate) {
return Stream.of(jobStorage.getAllJobSpecs())
.filter(predicate::test)
.toList();
}
@WorkerThread
synchronized void onRetry(@NonNull Job job, long backoffInterval) {
if (backoffInterval <= 0) {

View File

@@ -13,6 +13,7 @@ import org.signal.core.util.ThreadUtil;
import org.signal.core.util.logging.Log;
import org.thoughtcrime.securesms.jobmanager.impl.DefaultExecutorFactory;
import org.thoughtcrime.securesms.jobmanager.impl.JsonDataSerializer;
import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec;
import org.thoughtcrime.securesms.jobmanager.persistence.JobStorage;
import org.thoughtcrime.securesms.util.Debouncer;
import org.thoughtcrime.securesms.util.TextSecurePreferences;
@@ -33,6 +34,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
/**
* Allows the scheduling of durable jobs that will be run as early as possible.
@@ -234,6 +236,15 @@ public class JobManager implements ConstraintObserver.Notifier {
runOnExecutor(() -> jobController.update(updater));
}
/**
* Search through the list of pending jobs and find all that match a given predicate. Note that there will always be races here, and the result you get back
* may not be valid anymore by the time you get it. Use with caution.
*/
public @NonNull List<JobSpec> find(@NonNull Predicate<JobSpec> predicate) {
waitUntilInitialized();
return jobController.findJobs(predicate);
}
/**
* Runs the specified job synchronously. Beware: All normal dependencies are respected, meaning
* you must take great care where you call this. It could take a very long time to complete!

View File

@@ -140,6 +140,7 @@ public final class JobManagerFactories {
put(PushGroupUpdateJob.KEY, new PushGroupUpdateJob.Factory());
put(PushMediaSendJob.KEY, new PushMediaSendJob.Factory());
put(PushNotificationReceiveJob.KEY, new PushNotificationReceiveJob.Factory());
put(PushProcessEarlyMessagesJob.KEY, new PushProcessEarlyMessagesJob.Factory());
put(PushProcessMessageJob.KEY, new PushProcessMessageJob.Factory());
put(PushTextSendJob.KEY, new PushTextSendJob.Factory());
put(ReactionSendJob.KEY, new ReactionSendJob.Factory());

View File

@@ -0,0 +1,98 @@
package org.thoughtcrime.securesms.jobs
import org.signal.core.util.logging.Log
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.database.model.ServiceMessageId
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies
import org.thoughtcrime.securesms.jobmanager.Data
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.messages.MessageContentProcessor
import org.whispersystems.signalservice.api.messages.SignalServiceContent
import java.lang.Exception
import java.util.Optional
/**
* A job that should be enqueued whenever we process a message that we think has arrived "early" (see [org.thoughtcrime.securesms.util.EarlyMessageCache]).
* It will go through and process all of those early messages (if we have found a "match"), ordered by sentTimestamp.
*/
class PushProcessEarlyMessagesJob private constructor(parameters: Parameters) : BaseJob(parameters) {
private constructor() :
this(
Parameters.Builder()
.setMaxInstancesForFactory(2)
.setMaxAttempts(1)
.setLifespan(Parameters.IMMORTAL)
.build()
)
override fun getFactoryKey(): String {
return KEY
}
override fun serialize(): Data {
return Data.EMPTY
}
override fun onRun() {
val earlyIds: List<ServiceMessageId> = ApplicationDependencies.getEarlyMessageCache().allReferencedIds
.filter { SignalDatabase.mmsSms.getMessageFor(it.sentTimestamp, it.sender) != null }
.sortedBy { it.sentTimestamp }
if (earlyIds.isNotEmpty()) {
Log.i(TAG, "There are ${earlyIds.size} items in the early message cache with matches.")
for (id: ServiceMessageId in earlyIds) {
val contents: Optional<List<SignalServiceContent>> = ApplicationDependencies.getEarlyMessageCache().retrieve(id.sender, id.sentTimestamp)
if (contents.isPresent) {
for (content: SignalServiceContent in contents.get()) {
Log.i(TAG, "[${id.sentTimestamp}] Processing early content for $id")
MessageContentProcessor(context).process(MessageContentProcessor.MessageState.DECRYPTED_OK, content, null, id.sentTimestamp, -1)
}
} else {
Log.w(TAG, "[${id.sentTimestamp}] Saw $id in the cache, but when we went to retrieve it, it was already gone.")
}
}
} else {
Log.i(TAG, "There are no items in the early message cache with matches.")
}
}
override fun onShouldRetry(e: Exception): Boolean {
return false
}
override fun onFailure() {
}
class Factory : Job.Factory<PushProcessEarlyMessagesJob> {
override fun create(parameters: Parameters, data: Data): PushProcessEarlyMessagesJob {
return PushProcessEarlyMessagesJob(parameters)
}
}
companion object {
private val TAG = Log.tag(PushProcessEarlyMessagesJob::class.java)
const val KEY = "PushProcessEarlyMessageJob"
/**
* Enqueues a job to run after the most-recently-enqueued [PushProcessMessageJob].
*/
@JvmStatic
fun enqueue() {
val jobManger = ApplicationDependencies.getJobManager()
val youngestProcessJobId: String? = jobManger.find { it.factoryKey == PushProcessMessageJob.KEY }
.maxByOrNull { it.createTime }
?.id
if (youngestProcessJobId != null) {
jobManger.add(PushProcessEarlyMessagesJob(), listOf(youngestProcessJobId))
} else {
jobManger.add(PushProcessEarlyMessagesJob())
}
}
}
}

View File

@@ -79,6 +79,7 @@ import org.thoughtcrime.securesms.jobs.NullMessageSendJob;
import org.thoughtcrime.securesms.jobs.PaymentLedgerUpdateJob;
import org.thoughtcrime.securesms.jobs.PaymentTransactionCheckJob;
import org.thoughtcrime.securesms.jobs.ProfileKeySendJob;
import org.thoughtcrime.securesms.jobs.PushProcessEarlyMessagesJob;
import org.thoughtcrime.securesms.jobs.PushProcessMessageJob;
import org.thoughtcrime.securesms.jobs.RefreshAttributesJob;
import org.thoughtcrime.securesms.jobs.RefreshOwnProfileJob;
@@ -896,6 +897,7 @@ public final class MessageContentProcessor {
if (targetMessage == null) {
warn(String.valueOf(content.getTimestamp()), "[handleReaction] Could not find matching message! Putting it in the early message cache. timestamp: " + reaction.getTargetSentTimestamp() + " author: " + targetAuthor.getId());
ApplicationDependencies.getEarlyMessageCache().store(targetAuthor.getId(), reaction.getTargetSentTimestamp(), content);
PushProcessEarlyMessagesJob.enqueue();
return null;
}
@@ -952,6 +954,7 @@ public final class MessageContentProcessor {
} else if (targetMessage == null) {
warn(String.valueOf(content.getTimestamp()), "[handleRemoteDelete] Could not find matching message! timestamp: " + delete.getTargetSentTimestamp() + " author: " + senderRecipient.getId());
ApplicationDependencies.getEarlyMessageCache().store(senderRecipient.getId(), delete.getTargetSentTimestamp(), content);
PushProcessEarlyMessagesJob.enqueue();
return null;
} else {
warn(String.valueOf(content.getTimestamp()), String.format(Locale.ENGLISH, "[handleRemoteDelete] Invalid remote delete! deleteTime: %d, targetTime: %d, deleteAuthor: %s, targetAuthor: %s",
@@ -2133,6 +2136,10 @@ public final class MessageContentProcessor {
warn(String.valueOf(content.getTimestamp()), "[handleViewedReceipt] Could not find matching message! timestamp: " + id.getTimetamp() + " author: " + senderRecipient.getId());
ApplicationDependencies.getEarlyMessageCache().store(senderRecipient.getId(), id.getTimetamp(), content);
}
if (unhandled.size() > 0) {
PushProcessEarlyMessagesJob.enqueue();
}
}
@SuppressLint("DefaultLocale")
@@ -2153,6 +2160,10 @@ public final class MessageContentProcessor {
// Early delivery receipts are special-cased in the database methods
}
if (unhandled.size() > 0) {
PushProcessEarlyMessagesJob.enqueue();
}
SignalDatabase.messageLog().deleteEntriesForRecipient(message.getTimestamps(), senderRecipient.getId(), content.getSenderDevice());
}
@@ -2178,6 +2189,10 @@ public final class MessageContentProcessor {
warn(String.valueOf(content.getTimestamp()), "[handleReadReceipt] Could not find matching message! timestamp: " + id.getTimetamp() + " author: " + senderRecipient.getId());
ApplicationDependencies.getEarlyMessageCache().store(senderRecipient.getId(), id.getTimetamp(), content);
}
if (unhandled.size() > 0) {
PushProcessEarlyMessagesJob.enqueue();
}
}
private void handleTypingMessage(@NonNull SignalServiceContent content,

View File

@@ -2,9 +2,14 @@ package org.thoughtcrime.securesms.util;
import androidx.annotation.NonNull;
import org.thoughtcrime.securesms.database.MessageDatabase;
import org.thoughtcrime.securesms.database.model.MessageId;
import org.thoughtcrime.securesms.database.model.ServiceMessageId;
import org.thoughtcrime.securesms.recipients.RecipientId;
import org.whispersystems.signalservice.api.messages.SignalServiceContent;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
@@ -17,14 +22,14 @@ import java.util.Optional;
*/
public final class EarlyMessageCache {
private final LRUCache<MessageId, List<SignalServiceContent>> cache = new LRUCache<>(100);
private final LRUCache<ServiceMessageId, List<SignalServiceContent>> cache = new LRUCache<>(100);
/**
* @param targetSender The sender of the message this message depends on.
* @param targetSentTimestamp The sent timestamp of the message this message depends on.
*/
public void store(@NonNull RecipientId targetSender, long targetSentTimestamp, @NonNull SignalServiceContent content) {
MessageId messageId = new MessageId(targetSender, targetSentTimestamp);
public synchronized void store(@NonNull RecipientId targetSender, long targetSentTimestamp, @NonNull SignalServiceContent content) {
ServiceMessageId messageId = new ServiceMessageId(targetSender, targetSentTimestamp);
List<SignalServiceContent> contentList = cache.get(messageId);
if (contentList == null) {
@@ -41,31 +46,15 @@ public final class EarlyMessageCache {
* @param sender The sender of the message in question.
* @param sentTimestamp The sent timestamp of the message in question.
*/
public Optional<List<SignalServiceContent>> retrieve(@NonNull RecipientId sender, long sentTimestamp) {
return Optional.ofNullable(cache.remove(new MessageId(sender, sentTimestamp)));
public synchronized Optional<List<SignalServiceContent>> retrieve(@NonNull RecipientId sender, long sentTimestamp) {
return Optional.ofNullable(cache.remove(new ServiceMessageId(sender, sentTimestamp)));
}
private static final class MessageId {
private final RecipientId sender;
private final long sentTimestamp;
private MessageId(@NonNull RecipientId sender, long sentTimestamp) {
this.sender = sender;
this.sentTimestamp = sentTimestamp;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MessageId messageId = (MessageId) o;
return sentTimestamp == messageId.sentTimestamp &&
Objects.equals(sender, messageId.sender);
}
@Override
public int hashCode() {
return Objects.hash(sentTimestamp, sender);
}
/**
* Returns a collection of all of the {@link ServiceMessageId}s referenced in the cache at the moment of inquiry.
* Caution: There is no guarantee that this list will be relevant for any amount of time afterwards.
*/
public synchronized @NonNull Collection<ServiceMessageId> getAllReferencedIds() {
return new HashSet<>(cache.keySet());
}
}