From 1210b2af0f86b0ceb65e616e7cdd71fe56866b0c Mon Sep 17 00:00:00 2001 From: Greyson Parrelli Date: Thu, 16 Mar 2023 10:42:32 -0400 Subject: [PATCH] Some additional decryption perf improvements. --- .../securesms/testing/AliceClient.kt | 6 +- .../crypto/UnidentifiedAccessUtil.java | 22 +++++-- .../securesms/jobmanager/JobController.java | 66 ++++++++++++++----- .../securesms/jobmanager/JobLogger.java | 28 -------- .../securesms/jobmanager/JobLogger.kt | 26 ++++++++ .../securesms/jobmanager/JobManager.java | 17 ++++- .../securesms/jobs/PreKeysSyncJob.kt | 6 +- .../messages/IncomingMessageObserver.kt | 43 ++++++------ .../securesms/messages/MessageDecryptor.kt | 44 +++++++------ 9 files changed, 163 insertions(+), 95 deletions(-) delete mode 100644 app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobLogger.java create mode 100644 app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobLogger.kt diff --git a/app/src/androidTest/java/org/thoughtcrime/securesms/testing/AliceClient.kt b/app/src/androidTest/java/org/thoughtcrime/securesms/testing/AliceClient.kt index 4ce8c37580..615430b425 100644 --- a/app/src/androidTest/java/org/thoughtcrime/securesms/testing/AliceClient.kt +++ b/app/src/androidTest/java/org/thoughtcrime/securesms/testing/AliceClient.kt @@ -37,7 +37,11 @@ class AliceClient(val serviceId: ServiceId, val e164: String, val trustRoot: ECK fun process(envelope: Envelope, serverDeliveredTimestamp: Long) { val start = System.currentTimeMillis() val bufferedStore = BufferedProtocolStore.create() - ApplicationDependencies.getIncomingMessageObserver().processEnvelope(bufferedStore, envelope, serverDeliveredTimestamp) + ApplicationDependencies.getIncomingMessageObserver() + .processEnvelope(bufferedStore, envelope, serverDeliveredTimestamp) + ?.mapNotNull { it.run() } + ?.forEach { ApplicationDependencies.getJobManager().add(it) } + bufferedStore.flushToDisk() val end = System.currentTimeMillis() Log.d(TAG, "${end - start}") diff --git a/app/src/main/java/org/thoughtcrime/securesms/crypto/UnidentifiedAccessUtil.java b/app/src/main/java/org/thoughtcrime/securesms/crypto/UnidentifiedAccessUtil.java index 9538c7b9eb..24c4236a36 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/crypto/UnidentifiedAccessUtil.java +++ b/app/src/main/java/org/thoughtcrime/securesms/crypto/UnidentifiedAccessUtil.java @@ -44,12 +44,7 @@ public class UnidentifiedAccessUtil { private static final byte[] UNRESTRICTED_KEY = new byte[16]; public static CertificateValidator getCertificateValidator() { - try { - ECPublicKey unidentifiedSenderTrustRoot = Curve.decodePoint(Base64.decode(BuildConfig.UNIDENTIFIED_SENDER_TRUST_ROOT), 0); - return new CertificateValidator(unidentifiedSenderTrustRoot); - } catch (InvalidKeyException | IOException e) { - throw new AssertionError(e); - } + return CertificateValidatorHolder.INSTANCE.certificateValidator; } @WorkerThread @@ -209,4 +204,19 @@ public class UnidentifiedAccessUtil { return accessKey; } + + private enum CertificateValidatorHolder { + INSTANCE; + + final CertificateValidator certificateValidator = buildCertificateValidator(); + + private static CertificateValidator buildCertificateValidator() { + try { + ECPublicKey unidentifiedSenderTrustRoot = Curve.decodePoint(Base64.decode(BuildConfig.UNIDENTIFIED_SENDER_TRUST_ROOT), 0); + return new CertificateValidator(unidentifiedSenderTrustRoot); + } catch (InvalidKeyException | IOException e) { + throw new AssertionError(e); + } + } + } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java index 934dd00ace..cd925967c1 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobController.java @@ -156,6 +156,41 @@ class JobController { } } + @WorkerThread + void submitJobs(@NonNull List jobs) { + List canRun = new ArrayList<>(jobs.size()); + + synchronized (this) { + for (Job job : jobs) { + if (exceedsMaximumInstances(job)) { + jobTracker.onStateChange(job, JobTracker.JobState.IGNORED); + Log.w(TAG, JobLogger.format(job, "Already at the max instance count. Factory limit: " + job.getParameters().getMaxInstancesForFactory() + ", Queue limit: " + job.getParameters().getMaxInstancesForQueue() + ". Skipping.")); + } else { + canRun.add(job); + } + } + + if (canRun.isEmpty()) { + return; + } + + List fullSpecs = canRun.stream().map(it -> buildFullSpec(it, Collections.emptyList())).collect(java.util.stream.Collectors.toList()); + jobStorage.insertJobs(fullSpecs); + + scheduleJobs(canRun); + } + + // We have no control over what happens in jobs' onSubmit method, so we drop our lock to reduce the possibility of a deadlock + for (Job job : canRun) { + job.setContext(application); + job.onSubmit(); + } + + synchronized (this) { + notifyAll(); + } + } + @WorkerThread synchronized void cancelJob(@NonNull String id) { Job runningJob = runningJobs.get(id); @@ -359,25 +394,26 @@ class JobController { @WorkerThread private boolean chainExceedsMaximumInstances(@NonNull List> chain) { if (chain.size() == 1 && chain.get(0).size() == 1) { - Job solo = chain.get(0).get(0); + return exceedsMaximumInstances(chain.get(0).get(0)); + } else { + return false; + } + } - boolean exceedsFactory = solo.getParameters().getMaxInstancesForFactory() != Job.Parameters.UNLIMITED && - jobStorage.getJobCountForFactory(solo.getFactoryKey()) >= solo.getParameters().getMaxInstancesForFactory(); + @WorkerThread + private boolean exceedsMaximumInstances(@NonNull Job job) { + boolean exceedsFactory = job.getParameters().getMaxInstancesForFactory() != Job.Parameters.UNLIMITED && + jobStorage.getJobCountForFactory(job.getFactoryKey()) >= job.getParameters().getMaxInstancesForFactory(); - if (exceedsFactory) { - return true; - } - - boolean exceedsQueue = solo.getParameters().getQueue() != null && - solo.getParameters().getMaxInstancesForQueue() != Job.Parameters.UNLIMITED && - jobStorage.getJobCountForFactoryAndQueue(solo.getFactoryKey(), solo.getParameters().getQueue()) >= solo.getParameters().getMaxInstancesForQueue(); - - if (exceedsQueue) { - return true; - } + if (exceedsFactory) { + return true; } - return false; + boolean exceedsQueue = job.getParameters().getQueue() != null && + job.getParameters().getMaxInstancesForQueue() != Job.Parameters.UNLIMITED && + jobStorage.getJobCountForFactoryAndQueue(job.getFactoryKey(), job.getParameters().getQueue()) >= job.getParameters().getMaxInstancesForQueue(); + + return exceedsQueue; } @WorkerThread diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobLogger.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobLogger.java deleted file mode 100644 index 062783abed..0000000000 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobLogger.java +++ /dev/null @@ -1,28 +0,0 @@ -package org.thoughtcrime.securesms.jobmanager; - -import android.text.TextUtils; - -import androidx.annotation.NonNull; - -import java.util.Locale; - -public class JobLogger { - - public static String format(@NonNull Job job, @NonNull String event) { - return format(job, "", event); - } - - public static String format(@NonNull Job job, @NonNull String extraTag, @NonNull String event) { - String id = job.getId(); - String tag = TextUtils.isEmpty(extraTag) ? "" : "[" + extraTag + "]"; - long timeSinceSubmission = System.currentTimeMillis() - job.getParameters().getCreateTime(); - int runAttempt = job.getRunAttempt() + 1; - String maxAttempts = job.getParameters().getMaxAttempts() == Job.Parameters.UNLIMITED ? "Unlimited" - : String.valueOf(job.getParameters().getMaxAttempts()); - String lifespan = job.getParameters().getLifespan() == Job.Parameters.IMMORTAL ? "Immortal" - : String.valueOf(job.getParameters().getLifespan()) + " ms"; - return String.format(Locale.US, - "[%s][%s]%s %s (Time Since Submission: %d ms, Lifespan: %s, Run Attempt: %d/%s, Queue: %s)", - "JOB::" + id, job.getClass().getSimpleName(), tag, event, timeSinceSubmission, lifespan, runAttempt, maxAttempts, job.getParameters().getQueue()); - } -} diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobLogger.kt b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobLogger.kt new file mode 100644 index 0000000000..f70becf54b --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobLogger.kt @@ -0,0 +1,26 @@ +package org.thoughtcrime.securesms.jobmanager + +import android.text.TextUtils + +/** + * Provides utilities to create consistent logging for jobs. + */ +object JobLogger { + + @JvmStatic + fun format(job: Job, event: String): String { + return format(job, "", event) + } + + @JvmStatic + fun format(job: Job, extraTag: String, event: String): String { + val id = job.id + val tag = if (TextUtils.isEmpty(extraTag)) "" else "[$extraTag]" + val timeSinceSubmission = System.currentTimeMillis() - job.parameters.createTime + val runAttempt = job.runAttempt + 1 + val maxAttempts = if (job.parameters.maxAttempts == Job.Parameters.UNLIMITED) "Unlimited" else job.parameters.maxAttempts.toString() + val lifespan = if (job.parameters.lifespan == Job.Parameters.IMMORTAL) "Immortal" else job.parameters.lifespan.toString() + " ms" + + return "[JOB::$id][${job.javaClass.simpleName}]$tag $event (Time Since Submission: $timeSinceSubmission ms, Lifespan: $lifespan, Run Attempt: $runAttempt/$maxAttempts, Queue: ${job.parameters.queue})" + } +} diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java index 2fb20a5643..dc3a9d68c0 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java @@ -12,7 +12,6 @@ import androidx.annotation.WorkerThread; import org.signal.core.util.ThreadUtil; import org.signal.core.util.logging.Log; -import org.thoughtcrime.securesms.database.SignalDatabase; import org.thoughtcrime.securesms.jobmanager.impl.DefaultExecutorFactory; import org.thoughtcrime.securesms.jobmanager.impl.JsonDataSerializer; import org.thoughtcrime.securesms.jobmanager.persistence.JobSpec; @@ -177,7 +176,6 @@ public class JobManager implements ConstraintObserver.Notifier { runOnExecutor(() -> { jobController.submitJobWithExistingDependencies(job, Collections.emptyList(), dependsOnQueue); - jobController.wakeUp(); }); } @@ -190,7 +188,20 @@ public class JobManager implements ConstraintObserver.Notifier { runOnExecutor(() -> { jobController.submitJobWithExistingDependencies(job, dependsOn, dependsOnQueue); - jobController.wakeUp(); + }); + } + + public void addAll(@NonNull List jobs) { + if (jobs.isEmpty()) { + return; + } + + for (Job job : jobs) { + jobTracker.onStateChange(job, JobTracker.JobState.PENDING); + } + + runOnExecutor(() -> { + jobController.submitJobs(jobs); }); } diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/PreKeysSyncJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/PreKeysSyncJob.kt index 1888e3114d..90c350905e 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/PreKeysSyncJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/PreKeysSyncJob.kt @@ -35,10 +35,14 @@ class PreKeysSyncJob private constructor(private val forceRotate: Boolean = fals private const val PREKEY_MINIMUM = 10 private val REFRESH_INTERVAL = TimeUnit.DAYS.toMillis(3) + fun create(forceRotate: Boolean = false): PreKeysSyncJob { + return PreKeysSyncJob(forceRotate) + } + @JvmStatic @JvmOverloads fun enqueue(forceRotate: Boolean = false) { - ApplicationDependencies.getJobManager().add(PreKeysSyncJob(forceRotate)) + ApplicationDependencies.getJobManager().add(create(forceRotate)) } @JvmStatic diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt index aa4beccfe2..8e7db2ac87 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/IncomingMessageObserver.kt @@ -30,6 +30,7 @@ import org.thoughtcrime.securesms.jobs.PushDecryptMessageJob import org.thoughtcrime.securesms.jobs.PushProcessMessageJob import org.thoughtcrime.securesms.jobs.UnableToStartException import org.thoughtcrime.securesms.keyvalue.SignalStore +import org.thoughtcrime.securesms.messages.MessageDecryptor.FollowUpOperation import org.thoughtcrime.securesms.messages.protocol.BufferedProtocolStore import org.thoughtcrime.securesms.notifications.NotificationChannels import org.thoughtcrime.securesms.recipients.RecipientId @@ -253,7 +254,7 @@ class IncomingMessageObserver(private val context: Application) { } @VisibleForTesting - fun processEnvelope(bufferedProtocolStore: BufferedProtocolStore, envelope: SignalServiceProtos.Envelope, serverDeliveredTimestamp: Long): List? { + fun processEnvelope(bufferedProtocolStore: BufferedProtocolStore, envelope: SignalServiceProtos.Envelope, serverDeliveredTimestamp: Long): List? { return when (envelope.type.number) { SignalServiceProtos.Envelope.Type.RECEIPT_VALUE -> { processReceipt(envelope) @@ -274,36 +275,33 @@ class IncomingMessageObserver(private val context: Application) { } } - private fun processMessage(bufferedProtocolStore: BufferedProtocolStore, envelope: SignalServiceProtos.Envelope, serverDeliveredTimestamp: Long): List { + private fun processMessage(bufferedProtocolStore: BufferedProtocolStore, envelope: SignalServiceProtos.Envelope, serverDeliveredTimestamp: Long): List { val result = MessageDecryptor.decrypt(context, bufferedProtocolStore, envelope, serverDeliveredTimestamp) - when (result) { + val extraJob: Job? = when (result) { is MessageDecryptor.Result.Success -> { - ApplicationDependencies.getJobManager().add( - PushProcessMessageJob( - result.toMessageState(), - result.toSignalServiceContent(), - null, - -1, - result.envelope.timestamp - ) + PushProcessMessageJob( + result.toMessageState(), + result.toSignalServiceContent(), + null, + -1, + result.envelope.timestamp ) } is MessageDecryptor.Result.Error -> { - ApplicationDependencies.getJobManager().add( - PushProcessMessageJob( - result.toMessageState(), - null, - result.errorMetadata.toExceptionMetadata(), - -1, - result.envelope.timestamp - ) + PushProcessMessageJob( + result.toMessageState(), + null, + result.errorMetadata.toExceptionMetadata(), + -1, + result.envelope.timestamp ) } is MessageDecryptor.Result.Ignore -> { // No action needed + null } else -> { @@ -311,7 +309,7 @@ class IncomingMessageObserver(private val context: Application) { } } - return result.followUpOperations + return result.followUpOperations + FollowUpOperation { extraJob } } private fun processReceipt(envelope: SignalServiceProtos.Envelope) { @@ -412,13 +410,14 @@ class IncomingMessageObserver(private val context: Application) { val startTime = System.currentTimeMillis() ReentrantSessionLock.INSTANCE.acquire().use { SignalDatabase.rawDatabase.withinTransaction { - val followUpOperations = batch + val followUpOperations: List = batch .mapNotNull { processEnvelope(bufferedStore, it.envelope, it.serverDeliveredTimestamp) } .flatten() bufferedStore.flushToDisk() - followUpOperations.forEach { it.run() } + val jobs = followUpOperations.mapNotNull { it.run() } + ApplicationDependencies.getJobManager().addAll(jobs) } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/messages/MessageDecryptor.kt b/app/src/main/java/org/thoughtcrime/securesms/messages/MessageDecryptor.kt index f2425dcd71..7de4b7c6d8 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/messages/MessageDecryptor.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/messages/MessageDecryptor.kt @@ -33,6 +33,7 @@ import org.thoughtcrime.securesms.database.SignalDatabase import org.thoughtcrime.securesms.dependencies.ApplicationDependencies import org.thoughtcrime.securesms.groups.BadGroupIdException import org.thoughtcrime.securesms.groups.GroupId +import org.thoughtcrime.securesms.jobmanager.Job import org.thoughtcrime.securesms.jobs.AutomaticSessionResetJob import org.thoughtcrime.securesms.jobs.PreKeysSyncJob import org.thoughtcrime.securesms.jobs.SendRetryReceiptJob @@ -103,11 +104,11 @@ object MessageDecryptor { return Result.Ignore(envelope, serverDeliveredTimestamp, emptyList()) } - val followUpOperations: MutableList = mutableListOf() + val followUpOperations: MutableList = mutableListOf() if (envelope.type == Envelope.Type.PREKEY_BUNDLE) { - followUpOperations += Runnable { - PreKeysSyncJob.enqueue() + followUpOperations += FollowUpOperation { + PreKeysSyncJob.create() } } @@ -120,7 +121,7 @@ object MessageDecryptor { if (cipherResult == null) { Log.w(TAG, "${logPrefix(envelope)} Decryption resulted in a null result!", true) - return Result.Ignore(envelope, serverDeliveredTimestamp, followUpOperations) + return Result.Ignore(envelope, serverDeliveredTimestamp, followUpOperations.toUnmodifiableList()) } Log.d(TAG, "${logPrefix(envelope, cipherResult)} Successfully decrypted the envelope.") @@ -129,12 +130,12 @@ object MessageDecryptor { if (validationResult is EnvelopeContentValidator.Result.Invalid) { Log.w(TAG, "${logPrefix(envelope, cipherResult)} Invalid content! ${validationResult.reason}", validationResult.throwable) - return Result.Ignore(envelope, serverDeliveredTimestamp, followUpOperations) + return Result.Ignore(envelope, serverDeliveredTimestamp, followUpOperations.toUnmodifiableList()) } if (validationResult is EnvelopeContentValidator.Result.UnsupportedDataMessage) { Log.w(TAG, "${logPrefix(envelope, cipherResult)} Unsupported DataMessage! Our version: ${validationResult.ourVersion}, their version: ${validationResult.theirVersion}") - return Result.UnsupportedDataMessage(envelope, serverDeliveredTimestamp, cipherResult.toErrorMetadata(), followUpOperations) + return Result.UnsupportedDataMessage(envelope, serverDeliveredTimestamp, cipherResult.toErrorMetadata(), followUpOperations.toUnmodifiableList()) } // Must handle SKDM's immediately, because subsequent decryptions could rely on it @@ -185,9 +186,9 @@ object MessageDecryptor { } else { Log.w(TAG, "${logPrefix(envelope, e)} Retry receipts disabled! Enqueuing a session reset job, which will also insert an error message.", e, true) - followUpOperations += Runnable { + followUpOperations += FollowUpOperation { val sender: Recipient = Recipient.external(context, e.sender) - ApplicationDependencies.getJobManager().add(AutomaticSessionResetJob(sender.id, e.senderDevice, envelope.timestamp)) + AutomaticSessionResetJob(sender.id, e.senderDevice, envelope.timestamp) } Result.Ignore(envelope, serverDeliveredTimestamp, followUpOperations.toUnmodifiableList()) @@ -233,7 +234,7 @@ object MessageDecryptor { context: Context, envelope: Envelope, serverDeliveredTimestamp: Long, - followUpOperations: MutableList, + followUpOperations: MutableList, protocolException: ProtocolException ): Result { val contentHint: ContentHint = ContentHint.fromType(protocolException.contentHint) @@ -241,8 +242,8 @@ object MessageDecryptor { val receivedTimestamp: Long = System.currentTimeMillis() val sender: Recipient = Recipient.external(context, protocolException.sender) - followUpOperations += Runnable { - ApplicationDependencies.getJobManager().add(buildSendRetryReceiptJob(envelope, protocolException, sender)) + followUpOperations += FollowUpOperation { + buildSendRetryReceiptJob(envelope, protocolException, sender) } return when (contentHint) { @@ -254,7 +255,7 @@ object MessageDecryptor { ContentHint.RESENDABLE -> { Log.w(TAG, "${logPrefix(envelope)} The content hint is $contentHint, so we can try to resend the message.", true) - followUpOperations += Runnable { + followUpOperations += FollowUpOperation { val groupId: GroupId? = protocolException.parseGroupId(envelope) val threadId: Long = if (groupId != null) { val groupRecipient: Recipient = Recipient.externalPossiblyMigratedGroup(groupId) @@ -265,6 +266,7 @@ object MessageDecryptor { ApplicationDependencies.getPendingRetryReceiptCache().insert(sender.id, senderDevice, envelope.timestamp, receivedTimestamp, threadId) ApplicationDependencies.getPendingRetryReceiptManager().scheduleIfNecessary() + null } Result.Ignore(envelope, serverDeliveredTimestamp, followUpOperations) @@ -437,7 +439,7 @@ object MessageDecryptor { sealed interface Result { val envelope: Envelope val serverDeliveredTimestamp: Long - val followUpOperations: List + val followUpOperations: List /** Successfully decrypted the envelope content. The plaintext [Content] is available. */ class Success( @@ -445,7 +447,7 @@ object MessageDecryptor { override val serverDeliveredTimestamp: Long, val content: Content, val metadata: EnvelopeMetadata, - override val followUpOperations: List + override val followUpOperations: List ) : Result /** We could not decrypt the message, and an error should be inserted into the user's chat history. */ @@ -453,7 +455,7 @@ object MessageDecryptor { override val envelope: Envelope, override val serverDeliveredTimestamp: Long, override val errorMetadata: ErrorMetadata, - override val followUpOperations: List + override val followUpOperations: List ) : Result, Error /** The envelope used an invalid version of the Signal protocol. */ @@ -461,7 +463,7 @@ object MessageDecryptor { override val envelope: Envelope, override val serverDeliveredTimestamp: Long, override val errorMetadata: ErrorMetadata, - override val followUpOperations: List + override val followUpOperations: List ) : Result, Error /** The envelope used an old format that hasn't been used since 2015. This shouldn't be happening. */ @@ -469,7 +471,7 @@ object MessageDecryptor { override val envelope: Envelope, override val serverDeliveredTimestamp: Long, override val errorMetadata: ErrorMetadata, - override val followUpOperations: List + override val followUpOperations: List ) : Result, Error /** @@ -480,14 +482,14 @@ object MessageDecryptor { override val envelope: Envelope, override val serverDeliveredTimestamp: Long, override val errorMetadata: ErrorMetadata, - override val followUpOperations: List + override val followUpOperations: List ) : Result, Error /** There are no further results from this envelope that need to be processed. There may still be [followUpOperations]. */ class Ignore( override val envelope: Envelope, override val serverDeliveredTimestamp: Long, - override val followUpOperations: List + override val followUpOperations: List ) : Result interface Error { @@ -500,4 +502,8 @@ object MessageDecryptor { val senderDevice: Int, val groupId: GroupId? ) + + fun interface FollowUpOperation { + fun run(): Job? + } }