Add Observable for LiveRecipient.

This commit is contained in:
Greyson Parrelli
2023-02-17 10:34:34 -05:00
parent 21df032b04
commit dad9980a80
4 changed files with 24 additions and 23 deletions

View File

@@ -30,6 +30,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicReference;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.subjects.BehaviorSubject;
public final class LiveRecipient {
@@ -41,15 +42,17 @@ public final class LiveRecipient {
private final LiveData<Recipient> observableLiveDataResolved;
private final Set<RecipientForeverObserver> observers;
private final Observer<Recipient> foreverObserver;
private final AtomicReference<Recipient> recipient;
private final RecipientTable recipientTable;
private final GroupTable groupDatabase;
private final DistributionListTables distributionListTables;
private final MutableLiveData<Object> refreshForceNotify;
private final AtomicReference<Recipient> recipient;
private final RecipientTable recipientTable;
private final GroupTable groupDatabase;
private final DistributionListTables distributionListTables;
private final MutableLiveData<Object> refreshForceNotify;
private final BehaviorSubject<Recipient> subject;
LiveRecipient(@NonNull Context context, @NonNull Recipient defaultRecipient) {
this.context = context.getApplicationContext();
this.liveData = new MutableLiveData<>(defaultRecipient);
this.subject = BehaviorSubject.createDefault(defaultRecipient);
this.recipient = new AtomicReference<>(defaultRecipient);
this.recipientTable = SignalDatabase.recipients();
this.groupDatabase = SignalDatabase.groups();
@@ -80,6 +83,13 @@ public final class LiveRecipient {
return recipient.get();
}
/**
* @return An rx-flavored {@link Observable}.
*/
public @NonNull Observable<Recipient> observable() {
return subject.distinctUntilChanged(Recipient::hasSameContent);
}
/**
* Watch the recipient for changes. The callback will only be invoked if the provided lifecycle is
* in a valid state. No need to remove the observer. If you do wish to remove the observer (if,
@@ -97,19 +107,6 @@ public final class LiveRecipient {
ThreadUtil.runOnMain(() -> observableLiveData.removeObservers(owner));
}
public Observable<Recipient> asObservable() {
return Observable.create(emitter -> {
Recipient current = recipient.get();
if (current != null && current.getId() != RecipientId.UNKNOWN) {
emitter.onNext(current);
}
RecipientForeverObserver foreverObserver = emitter::onNext;
observeForever(foreverObserver);
emitter.setCancellable(() -> removeForeverObserver(foreverObserver));
});
}
/**
* Watch the recipient for changes. The callback could be invoked at any time. You MUST call
* {@link #removeForeverObserver(RecipientForeverObserver)} when finished. You should use
@@ -243,6 +240,7 @@ public final class LiveRecipient {
synchronized void set(@NonNull Recipient recipient) {
this.recipient.set(recipient);
this.liveData.postValue(recipient);
this.subject.onNext(recipient);
}
@Override