mirror of
https://github.com/signalapp/Signal-Android.git
synced 2025-12-24 21:15:48 +00:00
Acquire group lock before processing a message batch.
This commit is contained in:
@@ -10,7 +10,7 @@ import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
final class GroupsV2ProcessingLock {
|
||||
public final class GroupsV2ProcessingLock {
|
||||
|
||||
private static final String TAG = Log.tag(GroupsV2ProcessingLock.class);
|
||||
|
||||
@@ -20,7 +20,7 @@ final class GroupsV2ProcessingLock {
|
||||
private static final Lock lock = new ReentrantLock();
|
||||
|
||||
@WorkerThread
|
||||
static Closeable acquireGroupProcessingLock() throws GroupChangeBusyException {
|
||||
public static Closeable acquireGroupProcessingLock() throws GroupChangeBusyException {
|
||||
return acquireGroupProcessingLock(5000);
|
||||
}
|
||||
|
||||
|
||||
@@ -16,6 +16,7 @@ import org.thoughtcrime.securesms.R
|
||||
import org.thoughtcrime.securesms.crypto.ReentrantSessionLock
|
||||
import org.thoughtcrime.securesms.database.SignalDatabase
|
||||
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies
|
||||
import org.thoughtcrime.securesms.groups.GroupsV2ProcessingLock
|
||||
import org.thoughtcrime.securesms.jobmanager.Job
|
||||
import org.thoughtcrime.securesms.jobmanager.JobTracker
|
||||
import org.thoughtcrime.securesms.jobmanager.JobTracker.JobListener
|
||||
@@ -395,16 +396,18 @@ class IncomingMessageObserver(private val context: Application) {
|
||||
val bufferedStore = BufferedProtocolStore.create()
|
||||
|
||||
val startTime = System.currentTimeMillis()
|
||||
ReentrantSessionLock.INSTANCE.acquire().use {
|
||||
SignalDatabase.runInTransaction {
|
||||
val followUpOperations: List<FollowUpOperation> = batch
|
||||
.mapNotNull { processEnvelope(bufferedStore, it.envelope, it.serverDeliveredTimestamp) }
|
||||
.flatten()
|
||||
GroupsV2ProcessingLock.acquireGroupProcessingLock().use {
|
||||
ReentrantSessionLock.INSTANCE.acquire().use {
|
||||
SignalDatabase.runInTransaction {
|
||||
val followUpOperations: List<FollowUpOperation> = batch
|
||||
.mapNotNull { processEnvelope(bufferedStore, it.envelope, it.serverDeliveredTimestamp) }
|
||||
.flatten()
|
||||
|
||||
bufferedStore.flushToDisk()
|
||||
bufferedStore.flushToDisk()
|
||||
|
||||
val jobs = followUpOperations.mapNotNull { it.run() }
|
||||
ApplicationDependencies.getJobManager().addAll(jobs)
|
||||
val jobs = followUpOperations.mapNotNull { it.run() }
|
||||
ApplicationDependencies.getJobManager().addAll(jobs)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user