Migrate WebRtcCallViewModel to Flow apis.

This commit is contained in:
Alex Hart
2025-02-07 11:14:49 -04:00
committed by Greyson Parrelli
parent 355c3ff155
commit 5adba60e75
6 changed files with 311 additions and 180 deletions

View File

@@ -23,8 +23,9 @@ import androidx.compose.material3.MaterialTheme
import androidx.compose.material3.Text
import androidx.compose.material3.TextButton
import androidx.compose.runtime.Composable
import androidx.compose.runtime.collectAsState
import androidx.compose.runtime.getValue
import androidx.compose.runtime.remember
import androidx.compose.runtime.rxjava3.subscribeAsState
import androidx.compose.ui.Alignment
import androidx.compose.ui.Modifier
import androidx.compose.ui.graphics.Color
@@ -39,6 +40,7 @@ import androidx.compose.ui.unit.dp
import androidx.compose.ui.viewinterop.AndroidView
import androidx.core.os.bundleOf
import androidx.fragment.app.setFragmentResult
import kotlinx.coroutines.flow.map
import org.signal.core.ui.BottomSheets
import org.signal.core.ui.Buttons
import org.signal.core.ui.DarkPreview
@@ -81,12 +83,13 @@ class PendingParticipantsBottomSheet : ComposeBottomSheetDialogFragment() {
System.currentTimeMillis().milliseconds
}
val participants = viewModel.getPendingParticipants()
.map { it.pendingParticipantCollection.getAllPendingParticipants(launchTime).toList() }
.subscribeAsState(initial = emptyList())
val participants by remember {
viewModel.getPendingParticipants()
.map { it.pendingParticipantCollection.getAllPendingParticipants(launchTime).toList() }
}.collectAsState(initial = emptyList())
PendingParticipantsSheet(
pendingParticipants = participants.value,
pendingParticipants = participants,
onApproveAll = this::onApproveAll,
onDenyAll = this::onDenyAll,
onApprove = this::onApprove,

View File

@@ -30,11 +30,11 @@ import androidx.compose.material3.Text
import androidx.compose.material3.TextButton
import androidx.compose.runtime.Composable
import androidx.compose.runtime.LaunchedEffect
import androidx.compose.runtime.collectAsState
import androidx.compose.runtime.getValue
import androidx.compose.runtime.livedata.observeAsState
import androidx.compose.runtime.mutableStateOf
import androidx.compose.runtime.remember
import androidx.compose.runtime.rxjava3.subscribeAsState
import androidx.compose.runtime.setValue
import androidx.compose.ui.Alignment
import androidx.compose.ui.Modifier
@@ -51,6 +51,7 @@ import androidx.lifecycle.toLiveData
import com.google.android.material.dialog.MaterialAlertDialogBuilder
import io.reactivex.rxjava3.core.BackpressureStrategy
import io.reactivex.rxjava3.core.Observable
import kotlinx.coroutines.flow.map
import org.signal.core.ui.Dialogs
import org.signal.core.ui.Dividers
import org.signal.core.ui.Previews
@@ -83,22 +84,22 @@ object CallInfoView {
callbacks: Callbacks,
modifier: Modifier
) {
val participantsState: ParticipantsState by webRtcCallViewModel.callParticipantsState
.toFlowable(BackpressureStrategy.LATEST)
.map { state ->
ParticipantsState(
inCallLobby = state.callState == WebRtcViewModel.State.CALL_PRE_JOIN,
ringGroup = state.ringGroup,
includeSelf = state.groupCallState === WebRtcViewModel.GroupCallState.CONNECTED_AND_JOINED || state.groupCallState === WebRtcViewModel.GroupCallState.IDLE,
participantCount = if (state.participantCount.isPresent) state.participantCount.asLong.toInt() else 0,
remoteParticipants = state.allRemoteParticipants.sortedBy { it.callParticipantId.recipientId },
localParticipant = state.localParticipant,
groupMembers = state.groupMembers.filterNot { it.member.isSelf },
callRecipient = state.recipient,
raisedHands = state.raisedHands
)
}
.subscribeAsState(ParticipantsState())
val participantsState: ParticipantsState by remember {
webRtcCallViewModel.callParticipantsState
.map { state ->
ParticipantsState(
inCallLobby = state.callState == WebRtcViewModel.State.CALL_PRE_JOIN,
ringGroup = state.ringGroup,
includeSelf = state.groupCallState === WebRtcViewModel.GroupCallState.CONNECTED_AND_JOINED || state.groupCallState === WebRtcViewModel.GroupCallState.IDLE,
participantCount = if (state.participantCount.isPresent) state.participantCount.asLong.toInt() else 0,
remoteParticipants = state.allRemoteParticipants.sortedBy { it.callParticipantId.recipientId },
localParticipant = state.localParticipant,
groupMembers = state.groupMembers.filterNot { it.member.isSelf },
callRecipient = state.recipient,
raisedHands = state.raisedHands
)
}
}.collectAsState(ParticipantsState())
val controlAndInfoState: ControlAndInfoState by controlsAndInfoViewModel.state

View File

@@ -26,11 +26,11 @@ import androidx.compose.material3.Text
import androidx.compose.material3.TextButton
import androidx.compose.runtime.Composable
import androidx.compose.runtime.LaunchedEffect
import androidx.compose.runtime.collectAsState
import androidx.compose.runtime.derivedStateOf
import androidx.compose.runtime.getValue
import androidx.compose.runtime.mutableStateOf
import androidx.compose.runtime.remember
import androidx.compose.runtime.rxjava3.subscribeAsState
import androidx.compose.runtime.setValue
import androidx.compose.ui.Alignment
import androidx.compose.ui.Modifier
@@ -43,8 +43,8 @@ import androidx.compose.ui.res.vectorResource
import androidx.compose.ui.semantics.Role
import androidx.compose.ui.tooling.preview.Preview
import androidx.compose.ui.unit.dp
import io.reactivex.rxjava3.core.BackpressureStrategy
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.map
import org.signal.core.ui.theme.SignalTheme
import org.thoughtcrime.securesms.R
import org.thoughtcrime.securesms.components.webrtc.v2.WebRtcCallViewModel
@@ -65,26 +65,27 @@ object RaiseHandSnackbar {
fun View(webRtcCallViewModel: WebRtcCallViewModel, showCallInfoListener: () -> Unit, modifier: Modifier = Modifier) {
var expansionState by remember { mutableStateOf(ExpansionState(shouldExpand = false, forced = false)) }
val raisedHandsState by webRtcCallViewModel.callParticipantsState
.toFlowable(BackpressureStrategy.LATEST)
.map { state ->
val raisedHands = state.raisedHands.sortedBy {
if (it.sender.isSelf) {
if (it.sender.isPrimary) {
0
val raisedHandsState by remember {
webRtcCallViewModel.callParticipantsState
.map { state ->
val raisedHands = state.raisedHands.sortedBy {
if (it.sender.isSelf) {
if (it.sender.isPrimary) {
0
} else {
1
}
} else {
1
it.timestamp
}
} else {
it.timestamp
}
val shouldExpand = RaiseHandState.shouldExpand(raisedHands)
if (!expansionState.forced) {
expansionState = ExpansionState(shouldExpand, false)
}
raisedHands
}
val shouldExpand = RaiseHandState.shouldExpand(raisedHands)
if (!expansionState.forced) {
expansionState = ExpansionState(shouldExpand, false)
}
raisedHands
}.subscribeAsState(initial = emptyList())
}.collectAsState(initial = emptyList())
val state by remember {
derivedStateOf {

View File

@@ -0,0 +1,57 @@
/*
* Copyright 2025 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.components.webrtc.v2
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.drop
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapLatest
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.rx3.asFlow
import kotlinx.coroutines.withContext
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.database.model.GroupRecord
import org.thoughtcrime.securesms.groups.ui.GroupMemberEntry
import org.thoughtcrime.securesms.recipients.LiveRecipient
import org.thoughtcrime.securesms.recipients.Recipient
import org.thoughtcrime.securesms.recipients.RecipientId
/**
* Repository providing different fields about the call peer.
*/
@OptIn(ExperimentalCoroutinesApi::class)
class CallPeerRepository(
externalScope: CoroutineScope
) {
val recipientId = MutableStateFlow(RecipientId.UNKNOWN)
val liveRecipient: StateFlow<LiveRecipient> = recipientId.mapLatest { Recipient.live(it) }
.stateIn(externalScope, SharingStarted.Eagerly, Recipient.live(RecipientId.UNKNOWN))
private val recipient: Flow<Recipient> = liveRecipient.flatMapLatest { it.observable().asFlow() }
private val groupRecipient = recipient.filter { it.isActiveGroup }
private val groupRecord: Flow<GroupRecord> = groupRecipient.mapLatest {
withContext(Dispatchers.IO) {
SignalDatabase.groups.getGroup(it.requireGroupId()).get()
}
}
val groupMembers: Flow<List<GroupMemberEntry.FullMember>> = groupRecord.mapLatest { record ->
withContext(Dispatchers.IO) {
record.members.map { Recipient.resolved(it) }.map { GroupMemberEntry.FullMember(it, record.isAdmin(it)) }
}
}
val groupMembersChanged: Flow<List<GroupMemberEntry.FullMember>> = groupMembers.drop(1)
val groupMembersCount: Flow<Int> = groupMembers.map { it.size }
}

View File

@@ -25,15 +25,20 @@ import androidx.annotation.RequiresApi
import androidx.appcompat.app.AppCompatDelegate
import androidx.core.content.ContextCompat
import androidx.core.util.Consumer
import androidx.lifecycle.toLiveData
import androidx.lifecycle.Lifecycle
import androidx.lifecycle.lifecycleScope
import androidx.lifecycle.repeatOnLifecycle
import androidx.window.java.layout.WindowInfoTrackerCallbackAdapter
import androidx.window.layout.FoldingFeature
import androidx.window.layout.WindowInfoTracker
import androidx.window.layout.WindowLayoutInfo
import com.google.android.material.dialog.MaterialAlertDialogBuilder
import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers
import io.reactivex.rxjava3.core.BackpressureStrategy
import io.reactivex.rxjava3.disposables.Disposable
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.filterNotNull
import kotlinx.coroutines.launch
import org.greenrobot.eventbus.EventBus
import org.greenrobot.eventbus.Subscribe
import org.greenrobot.eventbus.ThreadMode
@@ -88,7 +93,6 @@ import org.thoughtcrime.securesms.util.TextSecurePreferences
import org.thoughtcrime.securesms.util.ThrottledDebouncer
import org.thoughtcrime.securesms.util.VibrateUtil
import org.thoughtcrime.securesms.util.WindowUtil
import org.thoughtcrime.securesms.util.livedata.LiveDataUtil
import org.thoughtcrime.securesms.util.visible
import org.thoughtcrime.securesms.webrtc.CallParticipantsViewState
import org.thoughtcrime.securesms.webrtc.audio.SignalAudioManager
@@ -211,12 +215,12 @@ class WebRtcCallActivity : BaseActivity(), SafetyNumberChangeDialog.Callback, Re
if (!hasCameraPermission() && !hasAudioPermission()) {
askCameraAudioPermissions {
callScreen.setMicEnabled(viewModel.microphoneEnabled.value ?: false)
callScreen.setMicEnabled(viewModel.microphoneEnabled.value)
handleSetMuteVideo(false)
}
} else if (!hasAudioPermission()) {
askAudioPermissions {
callScreen.setMicEnabled(viewModel.microphoneEnabled.value ?: false)
callScreen.setMicEnabled(viewModel.microphoneEnabled.value)
}
}
}
@@ -498,36 +502,74 @@ class WebRtcCallActivity : BaseActivity(), SafetyNumberChangeDialog.Callback, Re
viewModel.setIsLandscapeEnabled(true)
viewModel.setIsInPipMode(isInPipMode())
viewModel.microphoneEnabled.observe(this, callScreen::setMicEnabled)
viewModel.getWebRtcControls().observe(this) { controls ->
callScreen.setWebRtcControls(controls)
controlsAndInfo.updateControls(controls)
lifecycleScope.launch {
lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) {
launch {
viewModel.microphoneEnabled.collectLatest {
callScreen.setMicEnabled(it)
}
}
launch {
viewModel.getWebRtcControls().collectLatest {
callScreen.setWebRtcControls(it)
controlsAndInfo.updateControls(it)
}
}
launch {
viewModel.getEvents().collect { handleViewModelEvent(it) }
}
launch {
viewModel.getInCallStatus().collectLatest {
handleInCallStatus(it)
}
}
launch {
viewModel.getRecipientFlow().collectLatest {
callScreen.setRecipient(it)
}
}
launch {
val isStartedFromCallLink = getCallIntent().isStartedFromCallLink
combine(
viewModel.callParticipantsState,
viewModel.getEphemeralState().filterNotNull()
) { state, ephemeralState ->
CallParticipantsViewState(state, ephemeralState, orientation == Orientation.PORTRAIT_BOTTOM_EDGE, true, isStartedFromCallLink)
}.collectLatest(callScreen::updateCallParticipants)
}
launch {
viewModel.getCallParticipantListUpdate().collectLatest(participantUpdateWindow::addCallParticipantListUpdate)
}
launch {
viewModel.getSafetyNumberChangeEvent().collect { handleSafetyNumberChangeEvent(it) }
}
launch {
viewModel.getGroupMembersChanged().collectLatest { updateGroupMembersForGroupCall() }
}
launch {
viewModel.getGroupMemberCount().collectLatest { handleGroupMemberCountChange(it) }
}
launch {
viewModel.shouldShowSpeakerHint().collectLatest { updateSpeakerHint(it) }
}
}
}
viewModel.getEvents().observe(this, this::handleViewModelEvent)
lifecycleDisposable.add(viewModel.getInCallStatus().subscribe(this::handleInCallStatus))
lifecycleDisposable.add(viewModel.getRecipientFlowable().subscribe(callScreen::setRecipient))
val isStartedFromCallLink = getCallIntent().isStartedFromCallLink
LiveDataUtil.combineLatest(
viewModel.callParticipantsState.toFlowable(BackpressureStrategy.LATEST).toLiveData(),
viewModel.getEphemeralState()
) { state, ephemeralState ->
CallParticipantsViewState(state, ephemeralState, orientation == Orientation.PORTRAIT_BOTTOM_EDGE, true, isStartedFromCallLink)
}.observe(this, callScreen::updateCallParticipants)
viewModel.getCallParticipantListUpdate().observe(this, participantUpdateWindow::addCallParticipantListUpdate)
viewModel.getSafetyNumberChangeEvent().observe(this, this::handleSafetyNumberChangeEvent)
viewModel.getGroupMembersChanged().observe(this) { updateGroupMembersForGroupCall() }
viewModel.getGroupMemberCount().observe(this, this::handleGroupMemberCountChange)
lifecycleDisposable.add(viewModel.shouldShowSpeakerHint().subscribe(this::updateSpeakerHint))
callScreen.viewTreeObserver.addOnGlobalLayoutListener {
val state = viewModel.callParticipantsStateSnapshot
if (state != null) {
if (state.needsNewRequestSizes()) {
requestNewSizesThrottle.publish { AppDependencies.signalCallManager.updateRenderedResolutions() }
}
if (state.needsNewRequestSizes()) {
requestNewSizesThrottle.publish { AppDependencies.signalCallManager.updateRenderedResolutions() }
}
}
@@ -542,7 +584,14 @@ class WebRtcCallActivity : BaseActivity(), SafetyNumberChangeDialog.Callback, Re
}
callScreen.setPendingParticipantsViewListener(PendingParticipantsViewListener())
lifecycleDisposable += viewModel.getPendingParticipants().subscribe(callScreen::updatePendingParticipantsList)
lifecycleScope.launch {
lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) {
launch {
viewModel.getPendingParticipants().collect(callScreen::updatePendingParticipantsList)
}
}
}
}
private fun initializePictureInPictureParams() {
@@ -558,9 +607,15 @@ class WebRtcCallActivity : BaseActivity(), SafetyNumberChangeDialog.Callback, Re
pipBuilderParams.setAspectRatio(aspectRatio)
if (Build.VERSION.SDK_INT >= 31) {
viewModel.canEnterPipMode().observe(this) {
pipBuilderParams.setAutoEnterEnabled(it)
tryToSetPictureInPictureParams()
lifecycleScope.launch {
lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) {
launch {
viewModel.canEnterPipMode().collectLatest {
pipBuilderParams.setAutoEnterEnabled(it)
tryToSetPictureInPictureParams()
}
}
}
}
} else {
tryToSetPictureInPictureParams()
@@ -1077,7 +1132,7 @@ class WebRtcCallActivity : BaseActivity(), SafetyNumberChangeDialog.Callback, Re
override fun onHidden() {
val controlState = viewModel.getWebRtcControls().value
if (controlState == null || !controlState.displayErrorControls()) {
if (!controlState.displayErrorControls()) {
fullscreenHelper.hideSystemUI()
videoTooltip?.dismiss()
}

View File

@@ -8,22 +8,24 @@ package org.thoughtcrime.securesms.components.webrtc.v2
import android.os.Handler
import android.os.Looper
import androidx.annotation.MainThread
import androidx.lifecycle.LifecycleOwner
import androidx.lifecycle.LiveData
import androidx.lifecycle.MutableLiveData
import androidx.lifecycle.Observer
import androidx.lifecycle.ViewModel
import androidx.lifecycle.distinctUntilChanged
import androidx.lifecycle.map
import androidx.lifecycle.switchMap
import androidx.lifecycle.toPublisher
import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers
import io.reactivex.rxjava3.core.BackpressureStrategy
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.processors.BehaviorProcessor
import io.reactivex.rxjava3.subjects.BehaviorSubject
import org.signal.core.util.ThreadUtil
import androidx.lifecycle.viewModelScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.flow.update
import kotlinx.coroutines.launch
import kotlinx.coroutines.rx3.asFlow
import org.thoughtcrime.securesms.components.webrtc.CallParticipantListUpdate
import org.thoughtcrime.securesms.components.webrtc.CallParticipantsState
import org.thoughtcrime.securesms.components.webrtc.InCallStatus
@@ -37,7 +39,6 @@ import org.thoughtcrime.securesms.dependencies.AppDependencies
import org.thoughtcrime.securesms.events.CallParticipant
import org.thoughtcrime.securesms.events.CallParticipantId
import org.thoughtcrime.securesms.events.WebRtcViewModel
import org.thoughtcrime.securesms.groups.LiveGroup
import org.thoughtcrime.securesms.groups.ui.GroupMemberEntry
import org.thoughtcrime.securesms.keyvalue.SignalStore
import org.thoughtcrime.securesms.recipients.LiveRecipient
@@ -47,37 +48,37 @@ import org.thoughtcrime.securesms.service.webrtc.PendingParticipantCollection
import org.thoughtcrime.securesms.service.webrtc.state.PendingParticipantsState
import org.thoughtcrime.securesms.service.webrtc.state.WebRtcEphemeralState
import org.thoughtcrime.securesms.util.NetworkUtil
import org.thoughtcrime.securesms.util.SingleLiveEvent
import org.thoughtcrime.securesms.util.livedata.LiveDataUtil
import org.thoughtcrime.securesms.webrtc.audio.SignalAudioManager
import java.util.Collections
@OptIn(ExperimentalCoroutinesApi::class)
class WebRtcCallViewModel : ViewModel() {
private val internalMicrophoneEnabled = MutableLiveData(true)
private val isInPipMode = MutableLiveData(false)
private val webRtcControls = MutableLiveData(WebRtcControls.NONE)
private val foldableState = MutableLiveData(WebRtcControls.FoldableState.flat())
private val controlsWithFoldableState: LiveData<WebRtcControls> = LiveDataUtil.combineLatest(foldableState, webRtcControls, this::updateControlsFoldableState)
private val realWebRtcControls: LiveData<WebRtcControls> = LiveDataUtil.combineLatest(isInPipMode, controlsWithFoldableState, this::getRealWebRtcControls)
private val events = SingleLiveEvent<CallEvent>()
private val elapsed = BehaviorSubject.createDefault(-1L)
private val liveRecipient = MutableLiveData(Recipient.UNKNOWN.live())
private val participantsState = BehaviorSubject.createDefault(CallParticipantsState.STARTING_STATE)
private val callParticipantListUpdate = SingleLiveEvent<CallParticipantListUpdate>()
private val identityChangedRecipients = MutableLiveData<Collection<RecipientId>>(Collections.emptyList())
private val safetyNumberChangeEvent: LiveData<SafetyNumberChangeEvent> = LiveDataUtil.combineLatest(isInPipMode, identityChangedRecipients, ::SafetyNumberChangeEvent)
private val groupRecipient: LiveData<Recipient> = LiveDataUtil.filter(liveRecipient.switchMap(LiveRecipient::getLiveData), Recipient::isActiveGroup)
private val groupMembers: LiveData<List<GroupMemberEntry.FullMember>> = groupRecipient.switchMap { r -> LiveGroup(r.requireGroupId()).fullMembers }
private val groupMembersChanged: LiveData<List<GroupMemberEntry.FullMember>> = LiveDataUtil.skip(groupMembers, 1)
private val groupMembersCount: LiveData<Int> = groupMembers.map { it.size }
private val shouldShowSpeakerHint: Observable<Boolean> = participantsState.map(this::shouldShowSpeakerHint)
private val isLandscapeEnabled = MutableLiveData<Boolean>()
private val canEnterPipMode = MutableLiveData(false)
private val groupMemberStateUpdater = Observer<List<GroupMemberEntry.FullMember>> { m -> participantsState.onNext(CallParticipantsState.update(participantsState.value!!, m)) }
private val ephemeralState = MutableLiveData<WebRtcEphemeralState>()
private val recipientId = BehaviorProcessor.createDefault(RecipientId.UNKNOWN)
private val callPeerRepository = CallPeerRepository(viewModelScope)
private val pendingParticipants = BehaviorSubject.create<PendingParticipantCollection>()
private val internalMicrophoneEnabled = MutableStateFlow(true)
private val isInPipMode = MutableStateFlow(false)
private val webRtcControls = MutableStateFlow(WebRtcControls.NONE)
private val foldableState = MutableStateFlow(WebRtcControls.FoldableState.flat())
private val identityChangedRecipients = MutableStateFlow<Collection<RecipientId>>(Collections.emptyList())
private val isLandscapeEnabled = MutableStateFlow<Boolean?>(null)
private val canEnterPipMode = MutableStateFlow(false)
private val ephemeralState = MutableStateFlow<WebRtcEphemeralState?>(null)
private val controlsWithFoldableState: Flow<WebRtcControls> = combine(foldableState, webRtcControls, this::updateControlsFoldableState)
private val realWebRtcControls: StateFlow<WebRtcControls> = combine(isInPipMode, controlsWithFoldableState, this::getRealWebRtcControls)
.stateIn(viewModelScope, SharingStarted.Eagerly, WebRtcControls.NONE)
private val safetyNumberChangeEvent: Flow<SafetyNumberChangeEvent> = combine(isInPipMode, identityChangedRecipients, ::SafetyNumberChangeEvent)
private val events = MutableSharedFlow<CallEvent>()
private val callParticipantListUpdate = MutableSharedFlow<CallParticipantListUpdate>()
private val elapsed = MutableStateFlow(-1L)
private val participantsState = MutableStateFlow(CallParticipantsState.STARTING_STATE)
private val pendingParticipants = MutableStateFlow(PendingParticipantCollection())
private val groupMemberStateUpdater = FlowCollector<List<GroupMemberEntry.FullMember>> { m -> participantsState.update { CallParticipantsState.update(it, m) } }
private val shouldShowSpeakerHint: Flow<Boolean> = participantsState.map(this::shouldShowSpeakerHint)
private val elapsedTimeHandler = Handler(Looper.getMainLooper())
private val elapsedTimeRunnable = Runnable { handleTick() }
@@ -98,51 +99,51 @@ class WebRtcCallViewModel : ViewModel() {
private set
init {
groupMembers.observeForever(groupMemberStateUpdater)
viewModelScope.launch {
callPeerRepository.groupMembers.collect(groupMemberStateUpdater)
}
}
override fun onCleared() {
super.onCleared()
cancelTimer()
groupMembers.removeObserver(groupMemberStateUpdater)
}
val microphoneEnabled: LiveData<Boolean> get() = internalMicrophoneEnabled.distinctUntilChanged()
val microphoneEnabled: StateFlow<Boolean> get() = internalMicrophoneEnabled
fun getWebRtcControls(): LiveData<WebRtcControls> = realWebRtcControls
fun getWebRtcControls(): StateFlow<WebRtcControls> = realWebRtcControls
val recipient: LiveRecipient get() = liveRecipient.value!!
val recipient: LiveRecipient get() = callPeerRepository.liveRecipient.value
fun getRecipientFlowable(): Flowable<Recipient> {
return recipientId
.switchMap { Recipient.observable(it).toFlowable(BackpressureStrategy.LATEST) }
.observeOn(AndroidSchedulers.mainThread())
fun getRecipientFlow(): Flow<Recipient> {
return callPeerRepository.recipientId.flatMapLatest {
Recipient.observable(it).asFlow()
}
}
fun setRecipient(recipient: Recipient) {
recipientId.onNext(recipient.id)
liveRecipient.value = recipient.live()
callPeerRepository.recipientId.value = recipient.id
}
fun setFoldableState(foldableState: WebRtcControls.FoldableState) {
this.foldableState.postValue(foldableState)
ThreadUtil.runOnMain { participantsState.onNext(CallParticipantsState.update(participantsState.value!!, foldableState)) }
this.foldableState.update { foldableState }
participantsState.update { CallParticipantsState.update(it, foldableState) }
}
fun getEvents(): LiveData<CallEvent> {
fun getEvents(): Flow<CallEvent> {
return events
}
fun getInCallStatus(): Observable<InCallStatus> {
val elapsedTime: Observable<Long> = elapsed.map { timeInCall -> if (callConnectedTime == -1L) -1L else timeInCall }
fun getInCallStatus(): Flow<InCallStatus> {
val elapsedTime: Flow<Long> = elapsed.map { timeInCall -> if (callConnectedTime == -1L) -1L else timeInCall }
return Observable.combineLatest(
return combine(
elapsedTime,
pendingParticipants,
participantsState
) { time, pendingParticipants, participantsState ->
if (!recipient.get().isCallLink) {
return@combineLatest InCallStatus.ElapsedTime(time)
return@combine InCallStatus.ElapsedTime(time)
}
val pending: Set<PendingParticipantCollection.Entry> = pendingParticipants.getUnresolvedPendingParticipants()
@@ -152,12 +153,12 @@ class WebRtcCallViewModel : ViewModel() {
} else {
InCallStatus.JoinedCallLinkUsers(participantsState.participantCount.orElse(0L).toInt())
}
}.distinctUntilChanged().observeOn(AndroidSchedulers.mainThread())
}.distinctUntilChanged()
}
fun getCallControlsState(lifecycleOwner: LifecycleOwner): Flowable<CallControlsState> {
val groupSize: Flowable<Int> = recipientId.filter { it != RecipientId.UNKNOWN }
.switchMap { Recipient.observable(it).toFlowable(BackpressureStrategy.LATEST) }
fun getCallControlsState(): Flow<CallControlsState> {
val groupSize: Flow<Int> = callPeerRepository.recipientId.filter { it != RecipientId.UNKNOWN }
.flatMapLatest { Recipient.observable(it).asFlow() }
.map {
if (it.isActiveGroup) {
SignalDatabase.groups.getGroupMemberIds(it.requireGroupId(), GroupTable.MemberSet.FULL_MEMBERS_INCLUDING_SELF).size
@@ -166,47 +167,47 @@ class WebRtcCallViewModel : ViewModel() {
}
}
return Flowable.combineLatest(
callParticipantsState.toFlowable(BackpressureStrategy.LATEST),
getWebRtcControls().toPublisher(lifecycleOwner),
return combine(
callParticipantsState,
getWebRtcControls(),
groupSize,
CallControlsState::fromViewModelData
)
}
val callParticipantsState: Observable<CallParticipantsState> get() = participantsState
val callParticipantsState: Flow<CallParticipantsState> get() = participantsState
val callParticipantsStateSnapshot: CallParticipantsState? get() = participantsState.value
val callParticipantsStateSnapshot: CallParticipantsState get() = participantsState.value
fun getCallParticipantListUpdate(): LiveData<CallParticipantListUpdate> {
fun getCallParticipantListUpdate(): Flow<CallParticipantListUpdate> {
return callParticipantListUpdate
}
fun getSafetyNumberChangeEvent(): LiveData<SafetyNumberChangeEvent> {
fun getSafetyNumberChangeEvent(): Flow<SafetyNumberChangeEvent> {
return safetyNumberChangeEvent
}
fun getGroupMembersChanged(): LiveData<List<GroupMemberEntry.FullMember>> {
return groupMembersChanged
fun getGroupMembersChanged(): Flow<List<GroupMemberEntry.FullMember>> {
return callPeerRepository.groupMembersChanged
}
fun getGroupMemberCount(): LiveData<Int> {
return groupMembersCount
fun getGroupMemberCount(): Flow<Int> {
return callPeerRepository.groupMembersCount
}
fun shouldShowSpeakerHint(): Observable<Boolean> {
return shouldShowSpeakerHint.observeOn(AndroidSchedulers.mainThread())
fun shouldShowSpeakerHint(): Flow<Boolean> {
return shouldShowSpeakerHint
}
fun getCurrentAudioOutput(): WebRtcAudioOutput {
return getWebRtcControls().value!!.audioOutput
return getWebRtcControls().value.audioOutput
}
fun getEphemeralState(): LiveData<WebRtcEphemeralState> {
fun getEphemeralState(): Flow<WebRtcEphemeralState?> {
return ephemeralState
}
fun canEnterPipMode(): LiveData<Boolean> {
fun canEnterPipMode(): StateFlow<Boolean> {
return canEnterPipMode
}
@@ -214,22 +215,22 @@ class WebRtcCallViewModel : ViewModel() {
return answerWithVideoAvailable
}
fun getPendingParticipants(): Observable<PendingParticipantsState> {
val isInPipMode: Observable<Boolean> = participantsState.map { it.isInPipMode }.distinctUntilChanged()
return Observable.combineLatest(pendingParticipants, isInPipMode, ::PendingParticipantsState)
fun getPendingParticipants(): Flow<PendingParticipantsState> {
val isInPipMode: Flow<Boolean> = participantsState.map { it.isInPipMode }.distinctUntilChanged()
return combine(pendingParticipants, isInPipMode, ::PendingParticipantsState)
}
fun getPendingParticipantsSnapshot(): PendingParticipantCollection {
return pendingParticipants.value!!
return pendingParticipants.value
}
fun setIsInPipMode(isInPipMode: Boolean) {
this.isInPipMode.value = isInPipMode
participantsState.onNext(CallParticipantsState.update(participantsState.value!!, isInPipMode))
this.isInPipMode.update { isInPipMode }
participantsState.update { CallParticipantsState.update(it, isInPipMode) }
}
fun setIsLandscapeEnabled(isLandscapeEnabled: Boolean) {
this.isLandscapeEnabled.postValue(isLandscapeEnabled)
this.isLandscapeEnabled.update { isLandscapeEnabled }
}
@MainThread
@@ -238,22 +239,25 @@ class WebRtcCallViewModel : ViewModel() {
SignalStore.tooltips.markGroupCallSpeakerViewSeen()
}
val state = participantsState.value!!
val state = participantsState.value
if (showScreenShareTip &&
state.focusedParticipant.isScreenSharing &&
state.isViewingFocusedParticipant &&
page == CallParticipantsState.SelectedPage.GRID
) {
showScreenShareTip = false
events.value = CallEvent.ShowSwipeToSpeakerHint
viewModelScope.launch {
events.emit(CallEvent.ShowSwipeToSpeakerHint)
}
}
participantsState.onNext(CallParticipantsState.update(participantsState.value!!, page))
participantsState.update { CallParticipantsState.update(it, page) }
}
fun onLocalPictureInPictureClicked() {
val state = participantsState.value!!
participantsState.onNext(CallParticipantsState.setExpanded(participantsState.value!!, state.localRenderState != WebRtcLocalRenderState.EXPANDED))
participantsState.update {
CallParticipantsState.setExpanded(it, it.localRenderState != WebRtcLocalRenderState.EXPANDED)
}
}
fun onDismissedVideoTooltip() {
@@ -280,16 +284,20 @@ class WebRtcCallViewModel : ViewModel() {
val wasScreenSharing: Boolean = state.focusedParticipant.isScreenSharing
val newState: CallParticipantsState = CallParticipantsState.update(state, webRtcViewModel, enableVideo)
participantsState.onNext(newState)
participantsState.update { newState }
if (switchOnFirstScreenShare && !wasScreenSharing && newState.focusedParticipant.isScreenSharing) {
switchOnFirstScreenShare = false
events.value = CallEvent.SwitchToSpeaker
viewModelScope.launch {
events.emit(CallEvent.SwitchToSpeaker)
}
}
if (webRtcViewModel.groupState.isConnected) {
if (!containsPlaceholders(previousParticipantList)) {
val update = CallParticipantListUpdate.computeDeltaUpdate(previousParticipantList, webRtcViewModel.remoteParticipants)
callParticipantListUpdate.value = update
viewModelScope.launch {
callParticipantListUpdate.emit(update)
}
}
previousParticipantList = webRtcViewModel.remoteParticipants
@@ -312,7 +320,7 @@ class WebRtcCallViewModel : ViewModel() {
webRtcViewModel.remoteParticipants.size > CallParticipantsState.SMALL_GROUP_MAX
)
pendingParticipants.onNext(webRtcViewModel.pendingParticipants)
pendingParticipants.update { webRtcViewModel.pendingParticipants }
if (newState.isInOutgoingRingingMode) {
cancelTimer()
@@ -341,17 +349,17 @@ class WebRtcCallViewModel : ViewModel() {
if (localParticipant.cameraState.isEnabled) {
canDisplayTooltipIfNeeded = false
hasEnabledLocalVideo = true
events.value = CallEvent.DismissVideoTooltip
emitEvent(CallEvent.DismissVideoTooltip)
}
if (canDisplayTooltipIfNeeded && webRtcViewModel.isRemoteVideoEnabled && !hasEnabledLocalVideo) {
canDisplayTooltipIfNeeded = false
events.value = CallEvent.ShowVideoTooltip
emitEvent(CallEvent.ShowVideoTooltip)
}
if (canDisplayPopupIfNeeded && webRtcViewModel.isCellularConnection && NetworkUtil.isConnectedWifi(AppDependencies.application)) {
canDisplayPopupIfNeeded = false
events.value = CallEvent.ShowWifiToCellularPopup
emitEvent(CallEvent.ShowWifiToCellularPopup)
} else if (!webRtcViewModel.isCellularConnection) {
canDisplayPopupIfNeeded = true
}
@@ -363,7 +371,7 @@ class WebRtcCallViewModel : ViewModel() {
newState.allRemoteParticipants.isNotEmpty()
) {
canDisplaySwitchCameraTooltipIfNeeded = false
events.value = CallEvent.ShowSwitchCameraTooltip
emitEvent(CallEvent.ShowSwitchCameraTooltip)
}
}
@@ -379,13 +387,19 @@ class WebRtcCallViewModel : ViewModel() {
WebRtcCallRepository.getIdentityRecords(recipient) { identityRecords ->
if (identityRecords.isUntrusted(false) || identityRecords.isUnverified(false)) {
val records = identityRecords.unverifiedRecords + identityRecords.untrustedRecords
events.postValue(CallEvent.ShowGroupCallSafetyNumberChange(records))
emitEvent(CallEvent.ShowGroupCallSafetyNumberChange(records))
} else {
events.postValue(CallEvent.StartCall(isVideoCall))
emitEvent(CallEvent.StartCall(isVideoCall))
}
}
} else {
events.postValue(CallEvent.StartCall(isVideoCall))
emitEvent(CallEvent.StartCall(isVideoCall))
}
}
private fun emitEvent(callEvent: CallEvent) {
viewModelScope.launch {
events.emit(callEvent)
}
}
@@ -402,7 +416,7 @@ class WebRtcCallViewModel : ViewModel() {
}
val newValue = (System.currentTimeMillis() - callConnectedTime) / 1000
elapsed.onNext(newValue)
elapsed.update { newValue }
elapsedTimeHandler.postDelayed(elapsedTimeRunnable, 1000)
}