mirror of
https://github.com/signalapp/Signal-Android.git
synced 2025-12-22 12:08:34 +00:00
Drop V2 suffix from MCPv2 classes.
This commit is contained in:
committed by
Clark Chen
parent
18f9c6b1f0
commit
f566e10710
@@ -46,13 +46,13 @@ class EditMessageSyncProcessorTest {
|
||||
@get:Rule
|
||||
val harness = SignalActivityRule()
|
||||
|
||||
private lateinit var processorV2: MessageContentProcessorV2
|
||||
private lateinit var processorV2: MessageContentProcessor
|
||||
private lateinit var testResult: TestResults
|
||||
private var envelopeTimestamp: Long = 0
|
||||
|
||||
@Before
|
||||
fun setup() {
|
||||
processorV2 = MessageContentProcessorV2(harness.context)
|
||||
processorV2 = MessageContentProcessor(harness.context)
|
||||
envelopeTimestamp = System.currentTimeMillis()
|
||||
testResult = TestResults()
|
||||
}
|
||||
|
||||
@@ -20,17 +20,17 @@ import org.whispersystems.signalservice.internal.push.SignalServiceProtos.GroupC
|
||||
|
||||
@Suppress("ClassName")
|
||||
@RunWith(AndroidJUnit4::class)
|
||||
class MessageContentProcessorV2__recipientStatusTest {
|
||||
class MessageContentProcessor__recipientStatusTest {
|
||||
|
||||
@get:Rule
|
||||
val harness = SignalActivityRule()
|
||||
|
||||
private lateinit var processorV2: MessageContentProcessorV2
|
||||
private lateinit var processor: MessageContentProcessor
|
||||
private var envelopeTimestamp: Long = 0
|
||||
|
||||
@Before
|
||||
fun setup() {
|
||||
processorV2 = MessageContentProcessorV2(harness.context)
|
||||
processor = MessageContentProcessor(harness.context)
|
||||
envelopeTimestamp = System.currentTimeMillis()
|
||||
}
|
||||
|
||||
@@ -49,7 +49,7 @@ class MessageContentProcessorV2__recipientStatusTest {
|
||||
timestamp = envelopeTimestamp
|
||||
}
|
||||
|
||||
processorV2.process(
|
||||
processor.process(
|
||||
envelope = MessageContentFuzzer.envelope(envelopeTimestamp),
|
||||
content = MessageContentFuzzer.syncSentTextMessage(initialTextMessage, deliveredTo = listOf(harness.others[0])),
|
||||
metadata = MessageContentFuzzer.envelopeMetadata(harness.self.id, harness.self.id, groupId),
|
||||
@@ -61,7 +61,7 @@ class MessageContentProcessorV2__recipientStatusTest {
|
||||
val firstMessageId = firstSyncMessages[0].id
|
||||
val firstReceiptInfo = SignalDatabase.groupReceipts.getGroupReceiptInfo(firstMessageId)
|
||||
|
||||
processorV2.process(
|
||||
processor.process(
|
||||
envelope = MessageContentFuzzer.envelope(envelopeTimestamp),
|
||||
content = MessageContentFuzzer.syncSentTextMessage(initialTextMessage, deliveredTo = listOf(harness.others[0], harness.others[1]), recipientUpdate = true),
|
||||
metadata = MessageContentFuzzer.envelopeMetadata(harness.self.id, harness.self.id, groupId),
|
||||
@@ -59,14 +59,14 @@ class MessageProcessingPerformanceTest {
|
||||
mockkStatic(UnidentifiedAccessUtil::class)
|
||||
every { UnidentifiedAccessUtil.getCertificateValidator() } returns FakeClientHelpers.noOpCertificateValidator
|
||||
|
||||
mockkObject(MessageContentProcessorV2)
|
||||
every { MessageContentProcessorV2.create(harness.application) } returns TimingMessageContentProcessorV2(harness.application)
|
||||
mockkObject(MessageContentProcessor)
|
||||
every { MessageContentProcessor.create(harness.application) } returns TimingMessageContentProcessor(harness.application)
|
||||
}
|
||||
|
||||
@After
|
||||
fun after() {
|
||||
unmockkStatic(UnidentifiedAccessUtil::class)
|
||||
unmockkStatic(MessageContentProcessorV2::class)
|
||||
unmockkStatic(MessageContentProcessor::class)
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -107,7 +107,7 @@ class MessageProcessingPerformanceTest {
|
||||
// Wait until they've all been fully decrypted + processed
|
||||
harness
|
||||
.inMemoryLogger
|
||||
.getLockForUntil(TimingMessageContentProcessorV2.endTagPredicate(lastTimestamp))
|
||||
.getLockForUntil(TimingMessageContentProcessor.endTagPredicate(lastTimestamp))
|
||||
.awaitFor(1.minutes)
|
||||
|
||||
harness.inMemoryLogger.flush()
|
||||
@@ -126,7 +126,7 @@ class MessageProcessingPerformanceTest {
|
||||
|
||||
// Calculate MessageContentProcessor
|
||||
|
||||
val takeLast: List<Entry> = entries.filter { it.tag == TimingMessageContentProcessorV2.TAG }.drop(2)
|
||||
val takeLast: List<Entry> = entries.filter { it.tag == TimingMessageContentProcessor.TAG }.drop(2)
|
||||
val iterator = takeLast.iterator()
|
||||
var processCount = 0L
|
||||
var processDuration = 0L
|
||||
@@ -142,7 +142,7 @@ class MessageProcessingPerformanceTest {
|
||||
// Calculate messages per second from "retrieving" first message post session initialization to processing last message
|
||||
|
||||
val start = entries.first { it.message == "Retrieved envelope! $firstTimestamp" }
|
||||
val end = entries.first { it.message == TimingMessageContentProcessorV2.endTag(lastTimestamp) }
|
||||
val end = entries.first { it.message == TimingMessageContentProcessor.endTag(lastTimestamp) }
|
||||
|
||||
val duration = (end.timestamp - start.timestamp).toFloat() / 1000f
|
||||
val messagePerSecond = messageCount.toFloat() / duration
|
||||
@@ -157,7 +157,7 @@ class MessageProcessingPerformanceTest {
|
||||
|
||||
val aliceProcessFirstMessageLatch = harness
|
||||
.inMemoryLogger
|
||||
.getLockForUntil(TimingMessageContentProcessorV2.endTagPredicate(firstPreKeyMessageTimestamp))
|
||||
.getLockForUntil(TimingMessageContentProcessor.endTagPredicate(firstPreKeyMessageTimestamp))
|
||||
|
||||
Thread { aliceClient.process(encryptedEnvelope, System.currentTimeMillis()) }.start()
|
||||
aliceProcessFirstMessageLatch.awaitFor(15.seconds)
|
||||
|
||||
@@ -7,9 +7,9 @@ import org.thoughtcrime.securesms.util.SignalLocalMetrics
|
||||
import org.whispersystems.signalservice.api.crypto.EnvelopeMetadata
|
||||
import org.whispersystems.signalservice.internal.push.SignalServiceProtos
|
||||
|
||||
class TimingMessageContentProcessorV2(context: Context) : MessageContentProcessorV2(context) {
|
||||
class TimingMessageContentProcessor(context: Context) : MessageContentProcessor(context) {
|
||||
companion object {
|
||||
val TAG = Log.tag(TimingMessageContentProcessorV2::class.java)
|
||||
val TAG = Log.tag(TimingMessageContentProcessor::class.java)
|
||||
|
||||
fun endTagPredicate(timestamp: Long): LogPredicate = { entry ->
|
||||
entry.tag == TAG && entry.message == endTag(timestamp)
|
||||
@@ -40,7 +40,7 @@ import org.thoughtcrime.securesms.jobs.JobManagerFactories;
|
||||
import org.thoughtcrime.securesms.jobs.MarkerJob;
|
||||
import org.thoughtcrime.securesms.jobs.PreKeysSyncJob;
|
||||
import org.thoughtcrime.securesms.jobs.PushGroupSendJob;
|
||||
import org.thoughtcrime.securesms.jobs.PushProcessMessageJobV2;
|
||||
import org.thoughtcrime.securesms.jobs.PushProcessMessageJob;
|
||||
import org.thoughtcrime.securesms.jobs.ReactionSendJob;
|
||||
import org.thoughtcrime.securesms.jobs.TypingSendJob;
|
||||
import org.thoughtcrime.securesms.keyvalue.SignalStore;
|
||||
@@ -170,7 +170,7 @@ public class ApplicationDependencyProvider implements ApplicationDependencies.Pr
|
||||
.setConstraintObservers(JobManagerFactories.getConstraintObservers(context))
|
||||
.setJobStorage(new FastJobStorage(JobDatabase.getInstance(context)))
|
||||
.setJobMigrator(new JobMigrator(TextSecurePreferences.getJobManagerVersion(context), JobManager.CURRENT_VERSION, JobManagerFactories.getJobMigrations(context)))
|
||||
.addReservedJobRunner(new FactoryJobPredicate(PushProcessMessageJobV2.KEY, MarkerJob.KEY))
|
||||
.addReservedJobRunner(new FactoryJobPredicate(PushProcessMessageJob.KEY, MarkerJob.KEY))
|
||||
.addReservedJobRunner(new FactoryJobPredicate(IndividualSendJob.KEY, PushGroupSendJob.KEY, ReactionSendJob.KEY, TypingSendJob.KEY, GroupCallUpdateSendJob.KEY))
|
||||
.build();
|
||||
return new JobManager(context, config);
|
||||
|
||||
@@ -5,7 +5,7 @@ import org.signal.core.util.logging.Log
|
||||
import org.thoughtcrime.securesms.jobmanager.JobMigration
|
||||
import org.thoughtcrime.securesms.jobmanager.JsonJobData
|
||||
import org.thoughtcrime.securesms.jobs.FailingJob
|
||||
import org.thoughtcrime.securesms.jobs.PushProcessMessageErrorV2Job
|
||||
import org.thoughtcrime.securesms.jobs.PushProcessMessageErrorJob
|
||||
import org.thoughtcrime.securesms.messages.MessageState
|
||||
import org.thoughtcrime.securesms.util.Base64
|
||||
import org.whispersystems.signalservice.api.crypto.protos.CompleteMessage
|
||||
@@ -48,7 +48,7 @@ class PushProcessMessageJobMigration : JobMigration(10) {
|
||||
|
||||
else -> {
|
||||
Log.i(TAG, "Migrating push process error job for state: $state")
|
||||
jobData.withFactoryKey(PushProcessMessageErrorV2Job.KEY)
|
||||
jobData.withFactoryKey(PushProcessMessageErrorJob.KEY)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
||||
@@ -50,7 +50,7 @@ public class AutomaticSessionResetJob extends BaseJob {
|
||||
|
||||
public AutomaticSessionResetJob(@NonNull RecipientId recipientId, int deviceId, long sentTimestamp) {
|
||||
this(new Parameters.Builder()
|
||||
.setQueue(PushProcessMessageJobV2.getQueueName(recipientId))
|
||||
.setQueue(PushProcessMessageJob.getQueueName(recipientId))
|
||||
.addConstraint(DecryptionsDrainedConstraint.KEY)
|
||||
.setMaxInstancesForQueue(1)
|
||||
.build(),
|
||||
|
||||
@@ -37,7 +37,7 @@ internal class CallLinkPeekJob private constructor(
|
||||
|
||||
constructor(callLinkRecipientId: RecipientId) : this(
|
||||
Parameters.Builder()
|
||||
.setQueue(PushProcessMessageJobV2.getQueueName(callLinkRecipientId))
|
||||
.setQueue(PushProcessMessageJob.getQueueName(callLinkRecipientId))
|
||||
.setMaxInstancesForQueue(1)
|
||||
.setLifespan(TimeUnit.MINUTES.toMillis(1))
|
||||
.addConstraint(NetworkConstraint.KEY)
|
||||
|
||||
@@ -37,7 +37,7 @@ final class ForceUpdateGroupV2WorkerJob extends BaseJob {
|
||||
private final GroupId.V2 groupId;
|
||||
|
||||
ForceUpdateGroupV2WorkerJob(@NonNull GroupId.V2 groupId) {
|
||||
this(new Parameters.Builder().setQueue(PushProcessMessageJobV2.getQueueName(Recipient.externalGroupExact(groupId).getId()))
|
||||
this(new Parameters.Builder().setQueue(PushProcessMessageJob.getQueueName(Recipient.externalGroupExact(groupId).getId()))
|
||||
.addConstraint(NetworkConstraint.KEY)
|
||||
.setMaxAttempts(Parameters.UNLIMITED)
|
||||
.build(),
|
||||
|
||||
@@ -21,7 +21,7 @@ final class GroupCallPeekWorkerJob extends BaseJob {
|
||||
|
||||
public GroupCallPeekWorkerJob(@NonNull RecipientId groupRecipientId) {
|
||||
this(new Parameters.Builder()
|
||||
.setQueue(PushProcessMessageJobV2.getQueueName(groupRecipientId))
|
||||
.setQueue(PushProcessMessageJob.getQueueName(groupRecipientId))
|
||||
.setMaxInstancesForQueue(2)
|
||||
.build(),
|
||||
groupRecipientId);
|
||||
|
||||
@@ -170,8 +170,8 @@ public final class JobManagerFactories {
|
||||
put(PushGroupSilentUpdateSendJob.KEY, new PushGroupSilentUpdateSendJob.Factory());
|
||||
put(MessageFetchJob.KEY, new MessageFetchJob.Factory());
|
||||
put(PushProcessEarlyMessagesJob.KEY, new PushProcessEarlyMessagesJob.Factory());
|
||||
put(PushProcessMessageErrorV2Job.KEY, new PushProcessMessageErrorV2Job.Factory());
|
||||
put(PushProcessMessageJobV2.KEY, new PushProcessMessageJobV2.Factory());
|
||||
put(PushProcessMessageErrorJob.KEY, new PushProcessMessageErrorJob.Factory());
|
||||
put(PushProcessMessageJob.KEY, new PushProcessMessageJob.Factory());
|
||||
put(ReactionSendJob.KEY, new ReactionSendJob.Factory());
|
||||
put(RebuildMessageSearchIndexJob.KEY, new RebuildMessageSearchIndexJob.Factory());
|
||||
put(RefreshAttributesJob.KEY, new RefreshAttributesJob.Factory());
|
||||
|
||||
@@ -20,7 +20,7 @@ class LeaveGroupV2WorkerJob(parameters: Parameters, private val groupId: GroupId
|
||||
|
||||
constructor(groupId: GroupId.V2) : this(
|
||||
parameters = Parameters.Builder()
|
||||
.setQueue(PushProcessMessageJobV2.getQueueName(Recipient.externalGroupExact(groupId).id))
|
||||
.setQueue(PushProcessMessageJob.getQueueName(Recipient.externalGroupExact(groupId).id))
|
||||
.addConstraint(NetworkConstraint.KEY)
|
||||
.setMaxAttempts(Parameters.UNLIMITED)
|
||||
.setMaxInstancesForQueue(2)
|
||||
|
||||
@@ -6,7 +6,7 @@ import org.thoughtcrime.securesms.database.SignalDatabase
|
||||
import org.thoughtcrime.securesms.database.model.ServiceMessageId
|
||||
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies
|
||||
import org.thoughtcrime.securesms.jobmanager.Job
|
||||
import org.thoughtcrime.securesms.messages.MessageContentProcessorV2
|
||||
import org.thoughtcrime.securesms.messages.MessageContentProcessor
|
||||
import org.thoughtcrime.securesms.util.EarlyMessageCacheEntry
|
||||
|
||||
/**
|
||||
@@ -46,7 +46,7 @@ class PushProcessEarlyMessagesJob private constructor(parameters: Parameters) :
|
||||
if (earlyEntries != null) {
|
||||
for (entry in earlyEntries) {
|
||||
Log.i(TAG, "[${id.sentTimestamp}] Processing early V2 content for $id")
|
||||
MessageContentProcessorV2.create(context).process(entry.envelope, entry.content, entry.metadata, entry.serverDeliveredTimestamp, processingEarlyContent = true)
|
||||
MessageContentProcessor.create(context).process(entry.envelope, entry.content, entry.metadata, entry.serverDeliveredTimestamp, processingEarlyContent = true)
|
||||
}
|
||||
} else {
|
||||
Log.w(TAG, "[${id.sentTimestamp}] Saw $id in the cache, but when we went to retrieve it, it was already gone.")
|
||||
@@ -76,13 +76,13 @@ class PushProcessEarlyMessagesJob private constructor(parameters: Parameters) :
|
||||
const val KEY = "PushProcessEarlyMessageJob"
|
||||
|
||||
/**
|
||||
* Enqueues a job to run after the most-recently-enqueued [PushProcessMessageJobV2].
|
||||
* Enqueues a job to run after the most-recently-enqueued [PushProcessMessageJob].
|
||||
*/
|
||||
@JvmStatic
|
||||
fun enqueue() {
|
||||
val jobManger = ApplicationDependencies.getJobManager()
|
||||
|
||||
val youngestProcessJobId: String? = jobManger.find { it.factoryKey == PushProcessMessageJobV2.KEY }
|
||||
val youngestProcessJobId: String? = jobManger.find { it.factoryKey == PushProcessMessageJob.KEY }
|
||||
.maxByOrNull { it.createTime }
|
||||
?.id
|
||||
|
||||
|
||||
@@ -14,14 +14,14 @@ import org.thoughtcrime.securesms.jobmanager.Job
|
||||
import org.thoughtcrime.securesms.jobmanager.JsonJobData
|
||||
import org.thoughtcrime.securesms.jobmanager.impl.ChangeNumberConstraint
|
||||
import org.thoughtcrime.securesms.messages.ExceptionMetadata
|
||||
import org.thoughtcrime.securesms.messages.MessageContentProcessorV2
|
||||
import org.thoughtcrime.securesms.messages.MessageContentProcessor
|
||||
import org.thoughtcrime.securesms.messages.MessageState
|
||||
import org.thoughtcrime.securesms.recipients.Recipient
|
||||
|
||||
/**
|
||||
* Process messages that did not decrypt/validate successfully.
|
||||
*/
|
||||
class PushProcessMessageErrorV2Job private constructor(
|
||||
class PushProcessMessageErrorJob private constructor(
|
||||
parameters: Parameters,
|
||||
private val messageState: MessageState,
|
||||
private val exceptionMetadata: ExceptionMetadata,
|
||||
@@ -55,15 +55,15 @@ class PushProcessMessageErrorV2Job private constructor(
|
||||
return
|
||||
}
|
||||
|
||||
MessageContentProcessorV2.create(context).processException(messageState, exceptionMetadata, timestamp)
|
||||
MessageContentProcessor.create(context).processException(messageState, exceptionMetadata, timestamp)
|
||||
}
|
||||
|
||||
override fun onShouldRetry(e: Exception): Boolean = false
|
||||
|
||||
override fun onFailure() = Unit
|
||||
|
||||
class Factory : Job.Factory<PushProcessMessageErrorV2Job?> {
|
||||
override fun create(parameters: Parameters, serializedData: ByteArray?): PushProcessMessageErrorV2Job {
|
||||
class Factory : Job.Factory<PushProcessMessageErrorJob?> {
|
||||
override fun create(parameters: Parameters, serializedData: ByteArray?): PushProcessMessageErrorJob {
|
||||
val data = JsonJobData.deserialize(serializedData)
|
||||
|
||||
val state = MessageState.values()[data.getInt(KEY_MESSAGE_STATE)]
|
||||
@@ -75,14 +75,14 @@ class PushProcessMessageErrorV2Job private constructor(
|
||||
groupId = GroupId.parseNullableOrThrow(data.getStringOrDefault(KEY_EXCEPTION_GROUP_ID, null))
|
||||
)
|
||||
|
||||
return PushProcessMessageErrorV2Job(parameters, state, exceptionMetadata, data.getLong(KEY_TIMESTAMP))
|
||||
return PushProcessMessageErrorJob(parameters, state, exceptionMetadata, data.getLong(KEY_TIMESTAMP))
|
||||
}
|
||||
}
|
||||
|
||||
companion object {
|
||||
const val KEY = "PushProcessMessageErrorV2Job"
|
||||
|
||||
val TAG = Log.tag(PushProcessMessageErrorV2Job::class.java)
|
||||
val TAG = Log.tag(PushProcessMessageErrorJob::class.java)
|
||||
|
||||
private const val KEY_MESSAGE_STATE = "message_state"
|
||||
private const val KEY_TIMESTAMP = "timestamp"
|
||||
@@ -99,7 +99,7 @@ class PushProcessMessageErrorV2Job private constructor(
|
||||
return Parameters.Builder()
|
||||
.setMaxAttempts(Parameters.UNLIMITED)
|
||||
.addConstraint(ChangeNumberConstraint.KEY)
|
||||
.setQueue(PushProcessMessageJobV2.getQueueName(recipient.id))
|
||||
.setQueue(PushProcessMessageJob.getQueueName(recipient.id))
|
||||
.build()
|
||||
}
|
||||
}
|
||||
@@ -10,7 +10,7 @@ import org.thoughtcrime.securesms.groups.GroupsV1MigratedCache
|
||||
import org.thoughtcrime.securesms.jobmanager.Job
|
||||
import org.thoughtcrime.securesms.jobmanager.impl.ChangeNumberConstraint
|
||||
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint
|
||||
import org.thoughtcrime.securesms.messages.MessageContentProcessorV2
|
||||
import org.thoughtcrime.securesms.messages.MessageContentProcessor
|
||||
import org.thoughtcrime.securesms.messages.MessageDecryptor
|
||||
import org.thoughtcrime.securesms.messages.SignalServiceProtoUtil.groupId
|
||||
import org.thoughtcrime.securesms.recipients.RecipientId
|
||||
@@ -21,16 +21,15 @@ import org.whispersystems.signalservice.api.crypto.protos.CompleteMessage
|
||||
import org.whispersystems.signalservice.api.groupsv2.NoCredentialForRedemptionTimeException
|
||||
import org.whispersystems.signalservice.api.push.ServiceId
|
||||
import org.whispersystems.signalservice.api.push.exceptions.PushNetworkException
|
||||
import org.whispersystems.signalservice.internal.push.SignalServiceProtos
|
||||
import org.whispersystems.signalservice.internal.push.SignalServiceProtos.Content
|
||||
import org.whispersystems.signalservice.internal.push.SignalServiceProtos.Envelope
|
||||
import java.io.IOException
|
||||
import java.util.concurrent.TimeUnit
|
||||
import org.whispersystems.signalservice.api.crypto.protos.EnvelopeMetadata as EnvelopeMetadataProto
|
||||
|
||||
class PushProcessMessageJobV2 private constructor(
|
||||
class PushProcessMessageJob private constructor(
|
||||
parameters: Parameters,
|
||||
private val envelope: SignalServiceProtos.Envelope,
|
||||
private val envelope: Envelope,
|
||||
private val content: Content,
|
||||
private val metadata: EnvelopeMetadata,
|
||||
private val serverDeliveredTimestamp: Long
|
||||
@@ -59,7 +58,7 @@ class PushProcessMessageJobV2 private constructor(
|
||||
}
|
||||
|
||||
public override fun onRun() {
|
||||
val processor = MessageContentProcessorV2.create(context)
|
||||
val processor = MessageContentProcessor.create(context)
|
||||
processor.process(envelope, content, metadata, serverDeliveredTimestamp)
|
||||
}
|
||||
|
||||
@@ -71,11 +70,11 @@ class PushProcessMessageJobV2 private constructor(
|
||||
|
||||
override fun onFailure() = Unit
|
||||
|
||||
class Factory : Job.Factory<PushProcessMessageJobV2?> {
|
||||
override fun create(parameters: Parameters, data: ByteArray?): PushProcessMessageJobV2 {
|
||||
class Factory : Job.Factory<PushProcessMessageJob?> {
|
||||
override fun create(parameters: Parameters, data: ByteArray?): PushProcessMessageJob {
|
||||
return try {
|
||||
val completeMessage = CompleteMessage.ADAPTER.decode(data!!)
|
||||
PushProcessMessageJobV2(
|
||||
PushProcessMessageJob(
|
||||
parameters = parameters,
|
||||
envelope = Envelope.parseFrom(completeMessage.envelope.toByteArray()),
|
||||
content = Content.parseFrom(completeMessage.content.toByteArray()),
|
||||
@@ -99,7 +98,7 @@ class PushProcessMessageJobV2 private constructor(
|
||||
const val KEY = "PushProcessMessageJobV2"
|
||||
const val QUEUE_PREFIX = "__PUSH_PROCESS_JOB__"
|
||||
|
||||
private val TAG = Log.tag(PushProcessMessageJobV2::class.java)
|
||||
private val TAG = Log.tag(PushProcessMessageJob::class.java)
|
||||
|
||||
/**
|
||||
* Cache to keep track of empty 1:1 processing queues. Once a 1:1 queue is empty
|
||||
@@ -114,7 +113,7 @@ class PushProcessMessageJobV2 private constructor(
|
||||
return QUEUE_PREFIX + recipientId.toQueueKey()
|
||||
}
|
||||
|
||||
fun processOrDefer(messageProcessor: MessageContentProcessorV2, result: MessageDecryptor.Result.Success, localReceiveMetric: SignalLocalMetrics.MessageReceive): PushProcessMessageJobV2? {
|
||||
fun processOrDefer(messageProcessor: MessageContentProcessor, result: MessageDecryptor.Result.Success, localReceiveMetric: SignalLocalMetrics.MessageReceive): PushProcessMessageJob? {
|
||||
val queueName: String
|
||||
|
||||
val groupContext = GroupUtil.getGroupContextIfPresent(result.content)
|
||||
@@ -146,7 +145,7 @@ class PushProcessMessageJobV2 private constructor(
|
||||
if (requireNetwork) {
|
||||
builder.addConstraint(NetworkConstraint.KEY).setLifespan(TimeUnit.DAYS.toMillis(30))
|
||||
}
|
||||
PushProcessMessageJobV2(builder.build(), result.envelope.toBuilder().clearContent().build(), result.content, result.metadata, result.serverDeliveredTimestamp)
|
||||
PushProcessMessageJob(builder.build(), result.envelope.toBuilder().clearContent().build(), result.content, result.metadata, result.serverDeliveredTimestamp)
|
||||
} else {
|
||||
try {
|
||||
messageProcessor.process(result.envelope, result.content, result.metadata, result.serverDeliveredTimestamp, localMetric = localReceiveMetric)
|
||||
@@ -41,7 +41,7 @@ final class RequestGroupV2InfoWorkerJob extends BaseJob {
|
||||
@WorkerThread
|
||||
RequestGroupV2InfoWorkerJob(@NonNull GroupId.V2 groupId, int toRevision) {
|
||||
this(new Parameters.Builder()
|
||||
.setQueue(PushProcessMessageJobV2.getQueueName(Recipient.externalGroupExact(groupId).getId()))
|
||||
.setQueue(PushProcessMessageJob.getQueueName(Recipient.externalGroupExact(groupId).getId()))
|
||||
.addConstraint(NetworkConstraint.KEY)
|
||||
.setLifespan(TimeUnit.DAYS.toMillis(1))
|
||||
.setMaxAttempts(Parameters.UNLIMITED)
|
||||
|
||||
@@ -3,7 +3,7 @@ package org.thoughtcrime.securesms.messages
|
||||
import org.signal.ringrtc.CallId
|
||||
import org.thoughtcrime.securesms.database.model.IdentityRecord
|
||||
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies
|
||||
import org.thoughtcrime.securesms.messages.MessageContentProcessorV2.Companion.log
|
||||
import org.thoughtcrime.securesms.messages.MessageContentProcessor.Companion.log
|
||||
import org.thoughtcrime.securesms.recipients.Recipient
|
||||
import org.thoughtcrime.securesms.recipients.RecipientId
|
||||
import org.thoughtcrime.securesms.ringrtc.RemotePeer
|
||||
|
||||
@@ -48,16 +48,16 @@ import org.thoughtcrime.securesms.jobs.PaymentLedgerUpdateJob
|
||||
import org.thoughtcrime.securesms.jobs.PaymentTransactionCheckJob
|
||||
import org.thoughtcrime.securesms.jobs.ProfileKeySendJob
|
||||
import org.thoughtcrime.securesms.jobs.PushProcessEarlyMessagesJob
|
||||
import org.thoughtcrime.securesms.jobs.PushProcessMessageJobV2
|
||||
import org.thoughtcrime.securesms.jobs.PushProcessMessageJob
|
||||
import org.thoughtcrime.securesms.jobs.RefreshAttributesJob
|
||||
import org.thoughtcrime.securesms.jobs.RetrieveProfileJob
|
||||
import org.thoughtcrime.securesms.jobs.SendDeliveryReceiptJob
|
||||
import org.thoughtcrime.securesms.jobs.TrimThreadJob
|
||||
import org.thoughtcrime.securesms.linkpreview.LinkPreview
|
||||
import org.thoughtcrime.securesms.linkpreview.LinkPreviewUtil
|
||||
import org.thoughtcrime.securesms.messages.MessageContentProcessorV2.Companion.debug
|
||||
import org.thoughtcrime.securesms.messages.MessageContentProcessorV2.Companion.log
|
||||
import org.thoughtcrime.securesms.messages.MessageContentProcessorV2.Companion.warn
|
||||
import org.thoughtcrime.securesms.messages.MessageContentProcessor.Companion.debug
|
||||
import org.thoughtcrime.securesms.messages.MessageContentProcessor.Companion.log
|
||||
import org.thoughtcrime.securesms.messages.MessageContentProcessor.Companion.warn
|
||||
import org.thoughtcrime.securesms.messages.SignalServiceProtoUtil.groupMasterKey
|
||||
import org.thoughtcrime.securesms.messages.SignalServiceProtoUtil.hasGroupContext
|
||||
import org.thoughtcrime.securesms.messages.SignalServiceProtoUtil.hasRemoteDelete
|
||||
@@ -128,10 +128,10 @@ object DataMessageProcessor {
|
||||
val groupSecretParams = if (message.hasGroupContext) GroupSecretParams.deriveFromMasterKey(message.groupV2.groupMasterKey) else null
|
||||
val groupId: GroupId.V2? = if (groupSecretParams != null) GroupId.v2(groupSecretParams.publicParams.groupIdentifier) else null
|
||||
|
||||
var groupProcessResult: MessageContentProcessorV2.Gv2PreProcessResult? = null
|
||||
var groupProcessResult: MessageContentProcessor.Gv2PreProcessResult? = null
|
||||
if (groupId != null) {
|
||||
groupProcessResult = MessageContentProcessorV2.handleGv2PreProcessing(context, envelope.timestamp, content, metadata, groupId, message.groupV2, senderRecipient, groupSecretParams)
|
||||
if (groupProcessResult == MessageContentProcessorV2.Gv2PreProcessResult.IGNORE) {
|
||||
groupProcessResult = MessageContentProcessor.handleGv2PreProcessing(context, envelope.timestamp, content, metadata, groupId, message.groupV2, senderRecipient, groupSecretParams)
|
||||
if (groupProcessResult == MessageContentProcessor.Gv2PreProcessResult.IGNORE) {
|
||||
return
|
||||
}
|
||||
localMetrics?.onGv2Processed()
|
||||
@@ -157,7 +157,7 @@ object DataMessageProcessor {
|
||||
|
||||
if (groupId != null) {
|
||||
val unknownGroup = when (groupProcessResult) {
|
||||
MessageContentProcessorV2.Gv2PreProcessResult.GROUP_UP_TO_DATE -> threadRecipient.isUnknownGroup
|
||||
MessageContentProcessor.Gv2PreProcessResult.GROUP_UP_TO_DATE -> threadRecipient.isUnknownGroup
|
||||
else -> SignalDatabase.groups.isUnknownGroup(groupId)
|
||||
}
|
||||
if (unknownGroup) {
|
||||
@@ -177,17 +177,17 @@ object DataMessageProcessor {
|
||||
SignalExecutors.BOUNDED.execute { ApplicationDependencies.getJobManager().add(SendDeliveryReceiptJob(senderRecipient.id, message.timestamp, messageId)) }
|
||||
} else if (!metadata.sealedSender) {
|
||||
if (RecipientUtil.shouldHaveProfileKey(threadRecipient)) {
|
||||
Log.w(MessageContentProcessorV2.TAG, "Received an unsealed sender message from " + senderRecipient.id + ", but they should already have our profile key. Correcting.")
|
||||
Log.w(MessageContentProcessor.TAG, "Received an unsealed sender message from " + senderRecipient.id + ", but they should already have our profile key. Correcting.")
|
||||
|
||||
if (groupId != null) {
|
||||
Log.i(MessageContentProcessorV2.TAG, "Message was to a GV2 group. Ensuring our group profile keys are up to date.")
|
||||
Log.i(MessageContentProcessor.TAG, "Message was to a GV2 group. Ensuring our group profile keys are up to date.")
|
||||
ApplicationDependencies
|
||||
.getJobManager()
|
||||
.startChain(RefreshAttributesJob(false))
|
||||
.then(GroupV2UpdateSelfProfileKeyJob.withQueueLimits(groupId))
|
||||
.enqueue()
|
||||
} else if (!threadRecipient.isGroup) {
|
||||
Log.i(MessageContentProcessorV2.TAG, "Message was to a 1:1. Ensuring this user has our profile key.")
|
||||
Log.i(MessageContentProcessor.TAG, "Message was to a 1:1. Ensuring this user has our profile key.")
|
||||
val profileSendJob = ProfileKeySendJob.create(SignalDatabase.threads.getOrCreateThreadIdFor(threadRecipient), true)
|
||||
if (profileSendJob != null) {
|
||||
ApplicationDependencies
|
||||
@@ -612,7 +612,7 @@ object DataMessageProcessor {
|
||||
|
||||
val paymentNotification = message.payment.notification
|
||||
val uuid = UUID.randomUUID()
|
||||
val queue = "Payment_" + PushProcessMessageJobV2.getQueueName(senderRecipientId)
|
||||
val queue = "Payment_" + PushProcessMessageJob.getQueueName(senderRecipientId)
|
||||
|
||||
try {
|
||||
SignalDatabase.payments.createIncomingPayment(
|
||||
|
||||
@@ -14,8 +14,8 @@ import org.thoughtcrime.securesms.groups.GroupId
|
||||
import org.thoughtcrime.securesms.jobs.AttachmentDownloadJob
|
||||
import org.thoughtcrime.securesms.jobs.PushProcessEarlyMessagesJob
|
||||
import org.thoughtcrime.securesms.jobs.SendDeliveryReceiptJob
|
||||
import org.thoughtcrime.securesms.messages.MessageContentProcessorV2.Companion.log
|
||||
import org.thoughtcrime.securesms.messages.MessageContentProcessorV2.Companion.warn
|
||||
import org.thoughtcrime.securesms.messages.MessageContentProcessor.Companion.log
|
||||
import org.thoughtcrime.securesms.messages.MessageContentProcessor.Companion.warn
|
||||
import org.thoughtcrime.securesms.messages.SignalServiceProtoUtil.groupId
|
||||
import org.thoughtcrime.securesms.messages.SignalServiceProtoUtil.isMediaMessage
|
||||
import org.thoughtcrime.securesms.messages.SignalServiceProtoUtil.toPointersWithinLimit
|
||||
@@ -80,7 +80,7 @@ object EditMessageProcessor {
|
||||
return
|
||||
}
|
||||
|
||||
if (groupId != null && MessageContentProcessorV2.handleGv2PreProcessing(context, envelope.timestamp, content, metadata, groupId, message.groupV2, senderRecipient) == MessageContentProcessorV2.Gv2PreProcessResult.IGNORE) {
|
||||
if (groupId != null && MessageContentProcessor.handleGv2PreProcessing(context, envelope.timestamp, content, metadata, groupId, message.groupV2, senderRecipient) == MessageContentProcessor.Gv2PreProcessResult.IGNORE) {
|
||||
warn(envelope.timestamp, "[handleEditMessage] Group processor indicated we should ignore this.")
|
||||
return
|
||||
}
|
||||
|
||||
@@ -20,8 +20,8 @@ import org.thoughtcrime.securesms.jobmanager.impl.BackoffUtil
|
||||
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint
|
||||
import org.thoughtcrime.securesms.jobs.ForegroundServiceUtil
|
||||
import org.thoughtcrime.securesms.jobs.ForegroundServiceUtil.startWhenCapable
|
||||
import org.thoughtcrime.securesms.jobs.PushProcessMessageErrorV2Job
|
||||
import org.thoughtcrime.securesms.jobs.PushProcessMessageJobV2
|
||||
import org.thoughtcrime.securesms.jobs.PushProcessMessageErrorJob
|
||||
import org.thoughtcrime.securesms.jobs.PushProcessMessageJob
|
||||
import org.thoughtcrime.securesms.jobs.UnableToStartException
|
||||
import org.thoughtcrime.securesms.keyvalue.SignalStore
|
||||
import org.thoughtcrime.securesms.messages.MessageDecryptor.FollowUpOperation
|
||||
@@ -35,7 +35,6 @@ import org.whispersystems.signalservice.api.util.UuidUtil
|
||||
import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState
|
||||
import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException
|
||||
import org.whispersystems.signalservice.internal.push.SignalServiceProtos
|
||||
import java.util.*
|
||||
import java.util.concurrent.CopyOnWriteArrayList
|
||||
import java.util.concurrent.Semaphore
|
||||
import java.util.concurrent.TimeUnit
|
||||
@@ -94,7 +93,7 @@ class IncomingMessageObserver(private val context: Application) {
|
||||
}
|
||||
}
|
||||
|
||||
private val messageContentProcessor = MessageContentProcessorV2(context)
|
||||
private val messageContentProcessor = MessageContentProcessor(context)
|
||||
|
||||
private var appVisible = false
|
||||
private var lastInteractionTime: Long = System.currentTimeMillis()
|
||||
@@ -282,14 +281,14 @@ class IncomingMessageObserver(private val context: Application) {
|
||||
SignalLocalMetrics.MessageLatency.onMessageReceived(envelope.serverTimestamp, serverDeliveredTimestamp, envelope.urgent)
|
||||
when (result) {
|
||||
is MessageDecryptor.Result.Success -> {
|
||||
val job = PushProcessMessageJobV2.processOrDefer(messageContentProcessor, result, localReceiveMetric)
|
||||
val job = PushProcessMessageJob.processOrDefer(messageContentProcessor, result, localReceiveMetric)
|
||||
if (job != null) {
|
||||
return result.followUpOperations + FollowUpOperation { job }
|
||||
}
|
||||
}
|
||||
is MessageDecryptor.Result.Error -> {
|
||||
return result.followUpOperations + FollowUpOperation {
|
||||
PushProcessMessageErrorV2Job(
|
||||
PushProcessMessageErrorJob(
|
||||
result.toMessageState(),
|
||||
result.errorMetadata.toExceptionMetadata(),
|
||||
result.envelope.timestamp
|
||||
|
||||
@@ -58,7 +58,7 @@ import org.whispersystems.signalservice.internal.push.SignalServiceProtos.Typing
|
||||
import java.io.IOException
|
||||
import java.util.Optional
|
||||
|
||||
open class MessageContentProcessorV2(private val context: Context) {
|
||||
open class MessageContentProcessor(private val context: Context) {
|
||||
|
||||
enum class Gv2PreProcessResult {
|
||||
IGNORE,
|
||||
@@ -71,8 +71,8 @@ open class MessageContentProcessorV2(private val context: Context) {
|
||||
|
||||
@JvmStatic
|
||||
@JvmOverloads
|
||||
fun create(context: Context = ApplicationDependencies.getApplication()): MessageContentProcessorV2 {
|
||||
return MessageContentProcessorV2(context)
|
||||
fun create(context: Context = ApplicationDependencies.getApplication()): MessageContentProcessor {
|
||||
return MessageContentProcessor(context)
|
||||
}
|
||||
|
||||
fun debug(message: String) {
|
||||
@@ -6,8 +6,8 @@ import org.thoughtcrime.securesms.database.SignalDatabase
|
||||
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies
|
||||
import org.thoughtcrime.securesms.jobs.PushProcessEarlyMessagesJob
|
||||
import org.thoughtcrime.securesms.keyvalue.SignalStore
|
||||
import org.thoughtcrime.securesms.messages.MessageContentProcessorV2.Companion.log
|
||||
import org.thoughtcrime.securesms.messages.MessageContentProcessorV2.Companion.warn
|
||||
import org.thoughtcrime.securesms.messages.MessageContentProcessor.Companion.log
|
||||
import org.thoughtcrime.securesms.messages.MessageContentProcessor.Companion.warn
|
||||
import org.thoughtcrime.securesms.recipients.Recipient
|
||||
import org.thoughtcrime.securesms.recipients.RecipientId
|
||||
import org.thoughtcrime.securesms.util.EarlyMessageCacheEntry
|
||||
|
||||
@@ -9,8 +9,8 @@ import org.thoughtcrime.securesms.database.model.databaseprotos.ChatColor
|
||||
import org.thoughtcrime.securesms.database.model.databaseprotos.StoryTextPost
|
||||
import org.thoughtcrime.securesms.database.model.toBodyRangeList
|
||||
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies
|
||||
import org.thoughtcrime.securesms.messages.MessageContentProcessorV2.Companion.log
|
||||
import org.thoughtcrime.securesms.messages.MessageContentProcessorV2.Companion.warn
|
||||
import org.thoughtcrime.securesms.messages.MessageContentProcessor.Companion.log
|
||||
import org.thoughtcrime.securesms.messages.MessageContentProcessor.Companion.warn
|
||||
import org.thoughtcrime.securesms.messages.SignalServiceProtoUtil.groupId
|
||||
import org.thoughtcrime.securesms.messages.SignalServiceProtoUtil.toPointer
|
||||
import org.thoughtcrime.securesms.mms.IncomingMediaMessage
|
||||
|
||||
@@ -52,8 +52,8 @@ import org.thoughtcrime.securesms.jobs.RefreshOwnProfileJob
|
||||
import org.thoughtcrime.securesms.jobs.StickerPackDownloadJob
|
||||
import org.thoughtcrime.securesms.keyvalue.SignalStore
|
||||
import org.thoughtcrime.securesms.linkpreview.LinkPreview
|
||||
import org.thoughtcrime.securesms.messages.MessageContentProcessorV2.Companion.log
|
||||
import org.thoughtcrime.securesms.messages.MessageContentProcessorV2.Companion.warn
|
||||
import org.thoughtcrime.securesms.messages.MessageContentProcessor.Companion.log
|
||||
import org.thoughtcrime.securesms.messages.MessageContentProcessor.Companion.warn
|
||||
import org.thoughtcrime.securesms.messages.SignalServiceProtoUtil.groupId
|
||||
import org.thoughtcrime.securesms.messages.SignalServiceProtoUtil.groupMasterKey
|
||||
import org.thoughtcrime.securesms.messages.SignalServiceProtoUtil.hasGroupContext
|
||||
@@ -183,7 +183,7 @@ object SyncMessageProcessor {
|
||||
val groupId: GroupId.V2? = if (dataMessage.hasGroupContext) GroupId.v2(dataMessage.groupV2.groupMasterKey) else null
|
||||
|
||||
if (groupId != null) {
|
||||
if (MessageContentProcessorV2.handleGv2PreProcessing(context, envelope.timestamp, content, metadata, groupId, dataMessage.groupV2, senderRecipient) == MessageContentProcessorV2.Gv2PreProcessResult.IGNORE) {
|
||||
if (MessageContentProcessor.handleGv2PreProcessing(context, envelope.timestamp, content, metadata, groupId, dataMessage.groupV2, senderRecipient) == MessageContentProcessor.Gv2PreProcessResult.IGNORE) {
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -604,7 +604,7 @@ object SyncMessageProcessor {
|
||||
val dataMessage: DataMessage = sent.message
|
||||
val groupId: GroupId.V2? = dataMessage.groupV2.groupId
|
||||
|
||||
if (MessageContentProcessorV2.updateGv2GroupFromServerOrP2PChange(context, envelope.timestamp, dataMessage.groupV2, SignalDatabase.groups.getGroup(GroupId.v2(dataMessage.groupV2.groupMasterKey))) == null) {
|
||||
if (MessageContentProcessor.updateGv2GroupFromServerOrP2PChange(context, envelope.timestamp, dataMessage.groupV2, SignalDatabase.groups.getGroup(GroupId.v2(dataMessage.groupV2.groupMasterKey))) == null) {
|
||||
log(envelope.timestamp, "Ignoring GV2 message for group we are not currently in $groupId")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ import org.thoughtcrime.securesms.jobmanager.Job
|
||||
import org.thoughtcrime.securesms.jobmanager.JobTracker
|
||||
import org.thoughtcrime.securesms.jobmanager.JobTracker.JobListener
|
||||
import org.thoughtcrime.securesms.jobs.MarkerJob
|
||||
import org.thoughtcrime.securesms.jobs.PushProcessMessageJobV2
|
||||
import org.thoughtcrime.securesms.jobs.PushProcessMessageJob
|
||||
import org.thoughtcrime.securesms.util.NetworkUtil
|
||||
import org.thoughtcrime.securesms.util.PowerManagerCompat
|
||||
import org.thoughtcrime.securesms.util.ServiceUtil
|
||||
@@ -81,7 +81,7 @@ object WebSocketDrainer {
|
||||
val queueListener = QueueFindingJobListener()
|
||||
|
||||
jobManager.addListener(
|
||||
{ job: Job -> job.parameters.queue?.startsWith(PushProcessMessageJobV2.QUEUE_PREFIX) ?: false },
|
||||
{ job: Job -> job.parameters.queue?.startsWith(PushProcessMessageJob.QUEUE_PREFIX) ?: false },
|
||||
queueListener
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user