Decrease db thrashing when starting expiration timers for messages.

This commit is contained in:
Cody Henthorne
2023-10-11 16:14:08 -04:00
parent 05296e3d9b
commit 6a6b80cce2
6 changed files with 51 additions and 28 deletions

View File

@@ -559,13 +559,9 @@ class ConversationRepository(
} }
} }
fun startExpirationTimeout(messageRecord: MessageRecord) { fun startExpirationTimeout(expirationInfos: List<MessageTable.ExpirationInfo>) {
SignalExecutors.BOUNDED_IO.execute { SignalDatabase.messages.markExpireStarted(expirationInfos.map { it.id to it.expireStarted })
val now = System.currentTimeMillis() ApplicationDependencies.getExpiringMessageManager().scheduleDeletion(expirationInfos)
SignalDatabase.messages.markExpireStarted(messageRecord.id, now)
ApplicationDependencies.getExpiringMessageManager().scheduleDeletion(messageRecord.id, messageRecord.isMms, now, messageRecord.expiresIn)
}
} }
fun markLastSeen(threadId: Long) { fun markLastSeen(threadId: Long) {

View File

@@ -15,8 +15,10 @@ import io.reactivex.rxjava3.core.Completable
import io.reactivex.rxjava3.core.Flowable import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.core.Maybe import io.reactivex.rxjava3.core.Maybe
import io.reactivex.rxjava3.core.Observable import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.core.Observer
import io.reactivex.rxjava3.core.Single import io.reactivex.rxjava3.core.Single
import io.reactivex.rxjava3.disposables.CompositeDisposable import io.reactivex.rxjava3.disposables.CompositeDisposable
import io.reactivex.rxjava3.disposables.Disposable
import io.reactivex.rxjava3.kotlin.addTo import io.reactivex.rxjava3.kotlin.addTo
import io.reactivex.rxjava3.kotlin.plusAssign import io.reactivex.rxjava3.kotlin.plusAssign
import io.reactivex.rxjava3.kotlin.subscribeBy import io.reactivex.rxjava3.kotlin.subscribeBy
@@ -34,6 +36,7 @@ import org.thoughtcrime.securesms.conversation.ScheduledMessagesRepository
import org.thoughtcrime.securesms.conversation.mutiselect.MultiselectPart import org.thoughtcrime.securesms.conversation.mutiselect.MultiselectPart
import org.thoughtcrime.securesms.conversation.v2.data.ConversationElementKey import org.thoughtcrime.securesms.conversation.v2.data.ConversationElementKey
import org.thoughtcrime.securesms.database.DatabaseObserver import org.thoughtcrime.securesms.database.DatabaseObserver
import org.thoughtcrime.securesms.database.MessageTable
import org.thoughtcrime.securesms.database.model.IdentityRecord import org.thoughtcrime.securesms.database.model.IdentityRecord
import org.thoughtcrime.securesms.database.model.Mention import org.thoughtcrime.securesms.database.model.Mention
import org.thoughtcrime.securesms.database.model.MessageId import org.thoughtcrime.securesms.database.model.MessageId
@@ -67,6 +70,7 @@ import org.thoughtcrime.securesms.util.rx.RxStore
import org.thoughtcrime.securesms.wallpaper.ChatWallpaper import org.thoughtcrime.securesms.wallpaper.ChatWallpaper
import org.whispersystems.signalservice.api.push.ServiceId import org.whispersystems.signalservice.api.push.ServiceId
import java.util.Optional import java.util.Optional
import java.util.concurrent.TimeUnit
import kotlin.time.Duration import kotlin.time.Duration
/** /**
@@ -141,6 +145,8 @@ class ConversationViewModel(
.distinctUntilChanged() .distinctUntilChanged()
.observeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread())
private val startExpiration = BehaviorSubject.create<MessageTable.ExpirationInfo>()
init { init {
disposables += recipient disposables += recipient
.subscribeBy { .subscribeBy {
@@ -233,6 +239,16 @@ class ConversationViewModel(
identityRecordsStore.update { newState } identityRecordsStore.update { newState }
} }
.addTo(disposables) .addTo(disposables)
startExpiration
.buffer(startExpiration.throttleLast(1, TimeUnit.SECONDS))
.observeOn(Schedulers.io())
.subscribe(object : Observer<List<MessageTable.ExpirationInfo>> {
override fun onNext(t: List<MessageTable.ExpirationInfo>) = repository.startExpirationTimeout(t.distinctBy { it.id })
override fun onSubscribe(d: Disposable) = Unit
override fun onError(e: Throwable) = Unit
override fun onComplete() = Unit
})
} }
fun setSearchQuery(query: String?) { fun setSearchQuery(query: String?) {
@@ -245,6 +261,7 @@ class ConversationViewModel(
override fun onCleared() { override fun onCleared() {
disposables.clear() disposables.clear()
startExpiration.onComplete()
} }
fun setShowScrollButtonsForScrollPosition(showScrollButtons: Boolean, willScrollToBottomOnNewMessage: Boolean) { fun setShowScrollButtonsForScrollPosition(showScrollButtons: Boolean, willScrollToBottomOnNewMessage: Boolean) {
@@ -312,7 +329,14 @@ class ConversationViewModel(
} }
fun startExpirationTimeout(messageRecord: MessageRecord) { fun startExpirationTimeout(messageRecord: MessageRecord) {
repository.startExpirationTimeout(messageRecord) startExpiration.onNext(
MessageTable.ExpirationInfo(
id = messageRecord.id,
expiresIn = messageRecord.expiresIn,
expireStarted = System.currentTimeMillis(),
isMms = messageRecord.isMms
)
)
} }
fun updateReaction(messageRecord: MessageRecord, emoji: String): Completable { fun updateReaction(messageRecord: MessageRecord, emoji: String): Completable {

View File

@@ -49,10 +49,6 @@ public abstract class DatabaseTable {
protected void notifyConversationListeners(Set<Long> threadIds) { protected void notifyConversationListeners(Set<Long> threadIds) {
ApplicationDependencies.getDatabaseObserver().notifyConversationListeners(threadIds); ApplicationDependencies.getDatabaseObserver().notifyConversationListeners(threadIds);
for (long threadId : threadIds) {
notifyConversationListeners(threadId);
}
} }
protected void notifyConversationListeners(long threadId) { protected void notifyConversationListeners(long threadId) {

View File

@@ -2186,27 +2186,19 @@ open class MessageTable(context: Context?, databaseHelper: SignalDatabase) : Dat
@JvmOverloads @JvmOverloads
fun markExpireStarted(id: Long, startedTimestamp: Long = System.currentTimeMillis()) { fun markExpireStarted(id: Long, startedTimestamp: Long = System.currentTimeMillis()) {
markExpireStarted(setOf(id), startedTimestamp) markExpireStarted(setOf(id to startedTimestamp))
} }
fun markExpireStarted(ids: Collection<Long>, startedAtTimestamp: Long) { fun markExpireStarted(ids: Collection<kotlin.Pair<Long, Long>>) {
var threadId: Long = -1
writableDatabase.withinTransaction { db -> writableDatabase.withinTransaction { db ->
for (id in ids) { for ((id, startedAtTimestamp) in ids) {
db.update(TABLE_NAME) db.update(TABLE_NAME)
.values(EXPIRE_STARTED to startedAtTimestamp) .values(EXPIRE_STARTED to startedAtTimestamp)
.where("$ID = ? AND ($EXPIRE_STARTED = 0 OR $EXPIRE_STARTED > ?)", id, startedAtTimestamp) .where("$ID = ? AND ($EXPIRE_STARTED = 0 OR $EXPIRE_STARTED > ?)", id, startedAtTimestamp)
.run() .run()
ApplicationDependencies.getDatabaseObserver().notifyMessageUpdateObservers(MessageId(id))
if (threadId < 0) {
threadId = getThreadIdForMessage(id)
} }
} }
threads.update(threadId, false)
}
notifyConversationListeners(threadId)
} }
fun markAsNotified(id: Long) { fun markAsNotified(id: Long) {

View File

@@ -104,11 +104,11 @@ public class MarkReadReceiver extends BroadcastReceiver {
private static void scheduleDeletion(@NonNull List<ExpirationInfo> expirationInfo) { private static void scheduleDeletion(@NonNull List<ExpirationInfo> expirationInfo) {
if (expirationInfo.size() > 0) { if (expirationInfo.size() > 0) {
SignalDatabase.messages().markExpireStarted(Stream.of(expirationInfo).map(ExpirationInfo::getId).toList(), System.currentTimeMillis()); long now = System.currentTimeMillis();
SignalDatabase.messages().markExpireStarted(Stream.of(expirationInfo).map(info -> new kotlin.Pair<>(info.getId(), now)).toList());
ExpiringMessageManager expirationManager = ApplicationDependencies.getExpiringMessageManager(); ApplicationDependencies.getExpiringMessageManager()
.scheduleDeletion(Stream.of(expirationInfo).map(info -> info.copy(info.getId(), info.getExpiresIn(), now, info.isMms())).toList());
expirationInfo.stream().forEach(info -> expirationManager.scheduleDeletion(info.getId(), info.isMms(), info.getExpiresIn()));
} }
} }
} }

View File

@@ -2,15 +2,19 @@ package org.thoughtcrime.securesms.service;
import android.content.Context; import android.content.Context;
import androidx.annotation.NonNull;
import org.signal.core.util.logging.Log; import org.signal.core.util.logging.Log;
import org.thoughtcrime.securesms.database.MessageTable; import org.thoughtcrime.securesms.database.MessageTable;
import org.thoughtcrime.securesms.database.SignalDatabase; import org.thoughtcrime.securesms.database.SignalDatabase;
import org.thoughtcrime.securesms.database.model.MessageRecord; import org.thoughtcrime.securesms.database.model.MessageRecord;
import java.util.Comparator; import java.util.Comparator;
import java.util.List;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.stream.Collectors;
public class ExpiringMessageManager { public class ExpiringMessageManager {
@@ -45,6 +49,17 @@ public class ExpiringMessageManager {
} }
} }
public void scheduleDeletion(@NonNull List<MessageTable.ExpirationInfo> expirationInfos) {
List<ExpiringMessageReference> references = expirationInfos.stream()
.map(info -> new ExpiringMessageReference(info.getId(), info.isMms(), info.getExpireStarted() + info.getExpiresIn()))
.collect(Collectors.toList());
synchronized (expiringMessageReferences) {
expiringMessageReferences.addAll(references);
expiringMessageReferences.notifyAll();
}
}
public void checkSchedule() { public void checkSchedule() {
synchronized (expiringMessageReferences) { synchronized (expiringMessageReferences) {
expiringMessageReferences.notifyAll(); expiringMessageReferences.notifyAll();