diff --git a/app/src/main/java/org/thoughtcrime/securesms/recipients/LiveRecipient.java b/app/src/main/java/org/thoughtcrime/securesms/recipients/LiveRecipient.java index 9b02c18364..822ab5ced8 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/recipients/LiveRecipient.java +++ b/app/src/main/java/org/thoughtcrime/securesms/recipients/LiveRecipient.java @@ -6,13 +6,10 @@ import androidx.annotation.NonNull; import androidx.annotation.WorkerThread; import androidx.lifecycle.LifecycleOwner; import androidx.lifecycle.LiveData; -import androidx.lifecycle.MutableLiveData; import androidx.lifecycle.Observer; import org.signal.core.util.ThreadUtil; import org.signal.core.util.logging.Log; -import org.thoughtcrime.securesms.database.DistributionListTables; -import org.thoughtcrime.securesms.database.GroupTable; import org.thoughtcrime.securesms.database.RecipientTable; import org.thoughtcrime.securesms.database.SignalDatabase; import org.thoughtcrime.securesms.database.model.RecipientRecord; @@ -23,7 +20,9 @@ import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicReference; +import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers; import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.subjects.BehaviorSubject; public final class LiveRecipient { @@ -31,26 +30,21 @@ public final class LiveRecipient { private static final String TAG = Log.tag(LiveRecipient.class); private final Context context; - private final MutableLiveData liveData; private final LiveData observableLiveData; private final LiveData observableLiveDataResolved; private final Set observers; private final Observer foreverObserver; private final AtomicReference recipient; private final RecipientTable recipientTable; - private final GroupTable groupDatabase; - private final DistributionListTables distributionListTables; - private final MutableLiveData refreshForceNotify; private final BehaviorSubject subject; + private final BehaviorSubject forceNotifySubject; LiveRecipient(@NonNull Context context, @NonNull Recipient defaultRecipient) { this.context = context.getApplicationContext(); - this.liveData = new MutableLiveData<>(defaultRecipient); this.subject = BehaviorSubject.createDefault(defaultRecipient); + this.forceNotifySubject = BehaviorSubject.createDefault(new Object()); this.recipient = new AtomicReference<>(defaultRecipient); this.recipientTable = SignalDatabase.recipients(); - this.groupDatabase = SignalDatabase.groups(); - this.distributionListTables = SignalDatabase.distributionLists(); this.observers = new CopyOnWriteArraySet<>(); this.foreverObserver = recipient -> { ThreadUtil.postToMain(() -> { @@ -59,10 +53,9 @@ public final class LiveRecipient { } }); }; - this.refreshForceNotify = new MutableLiveData<>(new Object()); - this.observableLiveData = LiveDataUtil.combineLatest(LiveDataUtil.distinctUntilChanged(liveData, Recipient::hasSameContent), - refreshForceNotify, - (recipient, force) -> recipient); + this.observableLiveData = new ObservableLiveData<>(Observable.combineLatest(subject.distinctUntilChanged(Recipient::hasSameContent), + forceNotifySubject, + (recipient, force) -> recipient)); this.observableLiveDataResolved = LiveDataUtil.filter(this.observableLiveData, r -> !r.isResolving()); } @@ -178,7 +171,7 @@ public final class LiveRecipient { Recipient recipient = fetchAndCacheRecipientFromDisk(id); set(recipient); - refreshForceNotify.postValue(new Object()); + forceNotifySubject.onNext(new Object()); } public @NonNull LiveData getLiveData() { @@ -205,7 +198,6 @@ public final class LiveRecipient { synchronized void set(@NonNull Recipient recipient) { this.recipient.set(recipient); - this.liveData.postValue(recipient); this.subject.onNext(recipient); } @@ -221,4 +213,32 @@ public final class LiveRecipient { public int hashCode() { return Objects.hash(recipient); } + + /** + * A {@link LiveData} backed by an {@link Observable}. Subscribes to the source only when there + * are active observers ({@link #onActive()}) and disposes when all observers are removed + * ({@link #onInactive()}). This ensures no main thread work is scheduled for unobserved instances. + */ + private static class ObservableLiveData extends LiveData { + private final Observable source; + private Disposable disposable; + + ObservableLiveData(@NonNull Observable source) { + this.source = source; + } + + @Override + protected void onActive() { + disposable = source.observeOn(AndroidSchedulers.mainThread()) + .subscribe(this::setValue); + } + + @Override + protected void onInactive() { + if (disposable != null) { + disposable.dispose(); + disposable = null; + } + } + } }