Display audio levels for each participant in group calls.

This commit is contained in:
Rashad Sookram
2022-04-01 17:09:56 -04:00
committed by Cody Henthorne
parent a9f208153c
commit ec92d5ddb7
19 changed files with 379 additions and 32 deletions

View File

@@ -14,6 +14,7 @@ import com.annimon.stream.function.Predicate;
import org.signal.core.util.concurrent.SignalExecutors;
import org.thoughtcrime.securesms.util.concurrent.SerialMonoLifoExecutor;
import org.whispersystems.signalservice.api.util.Preconditions;
import java.util.LinkedHashSet;
import java.util.List;
@@ -98,6 +99,20 @@ public final class LiveDataUtil {
return new CombineLiveData<>(a, b, combine);
}
/**
* Once there is non-null data on each input {@link LiveData}, the {@link Combine3} function is
* run and produces a live data of the combined data.
* <p>
* As each live data changes, the combine function is re-run, and a new value is emitted always
* with the latest, non-null values.
*/
public static <A, B, C, R> LiveData<R> combineLatest(@NonNull LiveData<A> a,
@NonNull LiveData<B> b,
@NonNull LiveData<C> c,
@NonNull Combine3<A, B, C, R> combine) {
return new Combine3LiveData<>(a, b, c, combine);
}
/**
* Merges the supplied live data streams.
*/
@@ -285,4 +300,41 @@ public final class LiveDataUtil {
}
}
}
private static final class Combine3LiveData<A, B, C, R> extends MediatorLiveData<R> {
private A a;
private B b;
private C c;
Combine3LiveData(LiveData<A> liveDataA, LiveData<B> liveDataB, LiveData<C> liveDataC, Combine3<A, B, C, R> combine) {
Preconditions.checkArgument(liveDataA != liveDataB && liveDataB != liveDataC && liveDataA != liveDataC);
addSource(liveDataA, (a) -> {
if (a != null) {
this.a = a;
if (b != null && c != null) {
setValue(combine.apply(a, b, c));
}
}
});
addSource(liveDataB, (b) -> {
if (b != null) {
this.b = b;
if (a != null && c != null) {
setValue(combine.apply(a, b, c));
}
}
});
addSource(liveDataC, (c) -> {
if (c != null) {
this.c = c;
if (a != null && b != null) {
setValue(combine.apply(a, b, c));
}
}
});
}
}
}

View File

@@ -1,15 +1,19 @@
package org.thoughtcrime.securesms.util.rx
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.core.Scheduler
import io.reactivex.rxjava3.processors.BehaviorProcessor
import io.reactivex.rxjava3.schedulers.Schedulers
import io.reactivex.rxjava3.subjects.PublishSubject
/**
* Rx replacement for Store.
* Actions are run on the computation thread.
* Actions are run on the computation thread by default.
*/
class RxStore<T : Any>(defaultValue: T) {
class RxStore<T : Any>(
defaultValue: T,
private val scheduler: Scheduler = Schedulers.computation()
) {
private val behaviorProcessor = BehaviorProcessor.createDefault(defaultValue)
private val actionSubject = PublishSubject.create<(T) -> T>().toSerialized()
@@ -19,7 +23,7 @@ class RxStore<T : Any>(defaultValue: T) {
init {
actionSubject
.observeOn(Schedulers.computation())
.observeOn(scheduler)
.scan(defaultValue) { v, f -> f(v) }
.subscribe { behaviorProcessor.onNext(it) }
}