Prevent main thread posts when unobserved LiveRecipients change.

This commit is contained in:
Cody Henthorne
2026-03-06 12:10:04 -05:00
committed by jeffrey-signal
parent cf7fee2de8
commit b0b2c32a6f

View File

@@ -6,13 +6,10 @@ import androidx.annotation.NonNull;
import androidx.annotation.WorkerThread;
import androidx.lifecycle.LifecycleOwner;
import androidx.lifecycle.LiveData;
import androidx.lifecycle.MutableLiveData;
import androidx.lifecycle.Observer;
import org.signal.core.util.ThreadUtil;
import org.signal.core.util.logging.Log;
import org.thoughtcrime.securesms.database.DistributionListTables;
import org.thoughtcrime.securesms.database.GroupTable;
import org.thoughtcrime.securesms.database.RecipientTable;
import org.thoughtcrime.securesms.database.SignalDatabase;
import org.thoughtcrime.securesms.database.model.RecipientRecord;
@@ -23,7 +20,9 @@ import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicReference;
import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.subjects.BehaviorSubject;
public final class LiveRecipient {
@@ -31,26 +30,21 @@ public final class LiveRecipient {
private static final String TAG = Log.tag(LiveRecipient.class);
private final Context context;
private final MutableLiveData<Recipient> liveData;
private final LiveData<Recipient> observableLiveData;
private final LiveData<Recipient> observableLiveDataResolved;
private final Set<RecipientForeverObserver> observers;
private final Observer<Recipient> foreverObserver;
private final AtomicReference<Recipient> recipient;
private final RecipientTable recipientTable;
private final GroupTable groupDatabase;
private final DistributionListTables distributionListTables;
private final MutableLiveData<Object> refreshForceNotify;
private final BehaviorSubject<Recipient> subject;
private final BehaviorSubject<Object> forceNotifySubject;
LiveRecipient(@NonNull Context context, @NonNull Recipient defaultRecipient) {
this.context = context.getApplicationContext();
this.liveData = new MutableLiveData<>(defaultRecipient);
this.subject = BehaviorSubject.createDefault(defaultRecipient);
this.forceNotifySubject = BehaviorSubject.createDefault(new Object());
this.recipient = new AtomicReference<>(defaultRecipient);
this.recipientTable = SignalDatabase.recipients();
this.groupDatabase = SignalDatabase.groups();
this.distributionListTables = SignalDatabase.distributionLists();
this.observers = new CopyOnWriteArraySet<>();
this.foreverObserver = recipient -> {
ThreadUtil.postToMain(() -> {
@@ -59,10 +53,9 @@ public final class LiveRecipient {
}
});
};
this.refreshForceNotify = new MutableLiveData<>(new Object());
this.observableLiveData = LiveDataUtil.combineLatest(LiveDataUtil.distinctUntilChanged(liveData, Recipient::hasSameContent),
refreshForceNotify,
(recipient, force) -> recipient);
this.observableLiveData = new ObservableLiveData<>(Observable.combineLatest(subject.distinctUntilChanged(Recipient::hasSameContent),
forceNotifySubject,
(recipient, force) -> recipient));
this.observableLiveDataResolved = LiveDataUtil.filter(this.observableLiveData, r -> !r.isResolving());
}
@@ -178,7 +171,7 @@ public final class LiveRecipient {
Recipient recipient = fetchAndCacheRecipientFromDisk(id);
set(recipient);
refreshForceNotify.postValue(new Object());
forceNotifySubject.onNext(new Object());
}
public @NonNull LiveData<Recipient> getLiveData() {
@@ -205,7 +198,6 @@ public final class LiveRecipient {
synchronized void set(@NonNull Recipient recipient) {
this.recipient.set(recipient);
this.liveData.postValue(recipient);
this.subject.onNext(recipient);
}
@@ -221,4 +213,32 @@ public final class LiveRecipient {
public int hashCode() {
return Objects.hash(recipient);
}
/**
* A {@link LiveData} backed by an {@link Observable}. Subscribes to the source only when there
* are active observers ({@link #onActive()}) and disposes when all observers are removed
* ({@link #onInactive()}). This ensures no main thread work is scheduled for unobserved instances.
*/
private static class ObservableLiveData<T> extends LiveData<T> {
private final Observable<T> source;
private Disposable disposable;
ObservableLiveData(@NonNull Observable<T> source) {
this.source = source;
}
@Override
protected void onActive() {
disposable = source.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::setValue);
}
@Override
protected void onInactive() {
if (disposable != null) {
disposable.dispose();
disposable = null;
}
}
}
}