From 5adba60e75e62daa4728afc2858dc7e5dc71bb3a Mon Sep 17 00:00:00 2001 From: Alex Hart Date: Fri, 7 Feb 2025 11:14:49 -0400 Subject: [PATCH] Migrate WebRtcCallViewModel to Flow apis. --- .../webrtc/PendingParticipantsBottomSheet.kt | 13 +- .../webrtc/controls/CallInfoView.kt | 35 +-- .../webrtc/controls/RaiseHandSnackbar.kt | 37 +-- .../webrtc/v2/CallPeerRepository.kt | 57 +++++ .../webrtc/v2/WebRtcCallActivity.kt | 127 +++++++--- .../webrtc/v2/WebRtcCallViewModel.kt | 222 ++++++++++-------- 6 files changed, 311 insertions(+), 180 deletions(-) create mode 100644 app/src/main/java/org/thoughtcrime/securesms/components/webrtc/v2/CallPeerRepository.kt diff --git a/app/src/main/java/org/thoughtcrime/securesms/components/webrtc/PendingParticipantsBottomSheet.kt b/app/src/main/java/org/thoughtcrime/securesms/components/webrtc/PendingParticipantsBottomSheet.kt index 833728dea1..59dd596512 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/components/webrtc/PendingParticipantsBottomSheet.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/components/webrtc/PendingParticipantsBottomSheet.kt @@ -23,8 +23,9 @@ import androidx.compose.material3.MaterialTheme import androidx.compose.material3.Text import androidx.compose.material3.TextButton import androidx.compose.runtime.Composable +import androidx.compose.runtime.collectAsState +import androidx.compose.runtime.getValue import androidx.compose.runtime.remember -import androidx.compose.runtime.rxjava3.subscribeAsState import androidx.compose.ui.Alignment import androidx.compose.ui.Modifier import androidx.compose.ui.graphics.Color @@ -39,6 +40,7 @@ import androidx.compose.ui.unit.dp import androidx.compose.ui.viewinterop.AndroidView import androidx.core.os.bundleOf import androidx.fragment.app.setFragmentResult +import kotlinx.coroutines.flow.map import org.signal.core.ui.BottomSheets import org.signal.core.ui.Buttons import org.signal.core.ui.DarkPreview @@ -81,12 +83,13 @@ class PendingParticipantsBottomSheet : ComposeBottomSheetDialogFragment() { System.currentTimeMillis().milliseconds } - val participants = viewModel.getPendingParticipants() - .map { it.pendingParticipantCollection.getAllPendingParticipants(launchTime).toList() } - .subscribeAsState(initial = emptyList()) + val participants by remember { + viewModel.getPendingParticipants() + .map { it.pendingParticipantCollection.getAllPendingParticipants(launchTime).toList() } + }.collectAsState(initial = emptyList()) PendingParticipantsSheet( - pendingParticipants = participants.value, + pendingParticipants = participants, onApproveAll = this::onApproveAll, onDenyAll = this::onDenyAll, onApprove = this::onApprove, diff --git a/app/src/main/java/org/thoughtcrime/securesms/components/webrtc/controls/CallInfoView.kt b/app/src/main/java/org/thoughtcrime/securesms/components/webrtc/controls/CallInfoView.kt index 1cb6dcdfce..30e4e3cf1c 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/components/webrtc/controls/CallInfoView.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/components/webrtc/controls/CallInfoView.kt @@ -30,11 +30,11 @@ import androidx.compose.material3.Text import androidx.compose.material3.TextButton import androidx.compose.runtime.Composable import androidx.compose.runtime.LaunchedEffect +import androidx.compose.runtime.collectAsState import androidx.compose.runtime.getValue import androidx.compose.runtime.livedata.observeAsState import androidx.compose.runtime.mutableStateOf import androidx.compose.runtime.remember -import androidx.compose.runtime.rxjava3.subscribeAsState import androidx.compose.runtime.setValue import androidx.compose.ui.Alignment import androidx.compose.ui.Modifier @@ -51,6 +51,7 @@ import androidx.lifecycle.toLiveData import com.google.android.material.dialog.MaterialAlertDialogBuilder import io.reactivex.rxjava3.core.BackpressureStrategy import io.reactivex.rxjava3.core.Observable +import kotlinx.coroutines.flow.map import org.signal.core.ui.Dialogs import org.signal.core.ui.Dividers import org.signal.core.ui.Previews @@ -83,22 +84,22 @@ object CallInfoView { callbacks: Callbacks, modifier: Modifier ) { - val participantsState: ParticipantsState by webRtcCallViewModel.callParticipantsState - .toFlowable(BackpressureStrategy.LATEST) - .map { state -> - ParticipantsState( - inCallLobby = state.callState == WebRtcViewModel.State.CALL_PRE_JOIN, - ringGroup = state.ringGroup, - includeSelf = state.groupCallState === WebRtcViewModel.GroupCallState.CONNECTED_AND_JOINED || state.groupCallState === WebRtcViewModel.GroupCallState.IDLE, - participantCount = if (state.participantCount.isPresent) state.participantCount.asLong.toInt() else 0, - remoteParticipants = state.allRemoteParticipants.sortedBy { it.callParticipantId.recipientId }, - localParticipant = state.localParticipant, - groupMembers = state.groupMembers.filterNot { it.member.isSelf }, - callRecipient = state.recipient, - raisedHands = state.raisedHands - ) - } - .subscribeAsState(ParticipantsState()) + val participantsState: ParticipantsState by remember { + webRtcCallViewModel.callParticipantsState + .map { state -> + ParticipantsState( + inCallLobby = state.callState == WebRtcViewModel.State.CALL_PRE_JOIN, + ringGroup = state.ringGroup, + includeSelf = state.groupCallState === WebRtcViewModel.GroupCallState.CONNECTED_AND_JOINED || state.groupCallState === WebRtcViewModel.GroupCallState.IDLE, + participantCount = if (state.participantCount.isPresent) state.participantCount.asLong.toInt() else 0, + remoteParticipants = state.allRemoteParticipants.sortedBy { it.callParticipantId.recipientId }, + localParticipant = state.localParticipant, + groupMembers = state.groupMembers.filterNot { it.member.isSelf }, + callRecipient = state.recipient, + raisedHands = state.raisedHands + ) + } + }.collectAsState(ParticipantsState()) val controlAndInfoState: ControlAndInfoState by controlsAndInfoViewModel.state diff --git a/app/src/main/java/org/thoughtcrime/securesms/components/webrtc/controls/RaiseHandSnackbar.kt b/app/src/main/java/org/thoughtcrime/securesms/components/webrtc/controls/RaiseHandSnackbar.kt index 7520a06457..dfc671d099 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/components/webrtc/controls/RaiseHandSnackbar.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/components/webrtc/controls/RaiseHandSnackbar.kt @@ -26,11 +26,11 @@ import androidx.compose.material3.Text import androidx.compose.material3.TextButton import androidx.compose.runtime.Composable import androidx.compose.runtime.LaunchedEffect +import androidx.compose.runtime.collectAsState import androidx.compose.runtime.derivedStateOf import androidx.compose.runtime.getValue import androidx.compose.runtime.mutableStateOf import androidx.compose.runtime.remember -import androidx.compose.runtime.rxjava3.subscribeAsState import androidx.compose.runtime.setValue import androidx.compose.ui.Alignment import androidx.compose.ui.Modifier @@ -43,8 +43,8 @@ import androidx.compose.ui.res.vectorResource import androidx.compose.ui.semantics.Role import androidx.compose.ui.tooling.preview.Preview import androidx.compose.ui.unit.dp -import io.reactivex.rxjava3.core.BackpressureStrategy import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.map import org.signal.core.ui.theme.SignalTheme import org.thoughtcrime.securesms.R import org.thoughtcrime.securesms.components.webrtc.v2.WebRtcCallViewModel @@ -65,26 +65,27 @@ object RaiseHandSnackbar { fun View(webRtcCallViewModel: WebRtcCallViewModel, showCallInfoListener: () -> Unit, modifier: Modifier = Modifier) { var expansionState by remember { mutableStateOf(ExpansionState(shouldExpand = false, forced = false)) } - val raisedHandsState by webRtcCallViewModel.callParticipantsState - .toFlowable(BackpressureStrategy.LATEST) - .map { state -> - val raisedHands = state.raisedHands.sortedBy { - if (it.sender.isSelf) { - if (it.sender.isPrimary) { - 0 + val raisedHandsState by remember { + webRtcCallViewModel.callParticipantsState + .map { state -> + val raisedHands = state.raisedHands.sortedBy { + if (it.sender.isSelf) { + if (it.sender.isPrimary) { + 0 + } else { + 1 + } } else { - 1 + it.timestamp } - } else { - it.timestamp } + val shouldExpand = RaiseHandState.shouldExpand(raisedHands) + if (!expansionState.forced) { + expansionState = ExpansionState(shouldExpand, false) + } + raisedHands } - val shouldExpand = RaiseHandState.shouldExpand(raisedHands) - if (!expansionState.forced) { - expansionState = ExpansionState(shouldExpand, false) - } - raisedHands - }.subscribeAsState(initial = emptyList()) + }.collectAsState(initial = emptyList()) val state by remember { derivedStateOf { diff --git a/app/src/main/java/org/thoughtcrime/securesms/components/webrtc/v2/CallPeerRepository.kt b/app/src/main/java/org/thoughtcrime/securesms/components/webrtc/v2/CallPeerRepository.kt new file mode 100644 index 0000000000..47208cb5db --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/components/webrtc/v2/CallPeerRepository.kt @@ -0,0 +1,57 @@ +/* + * Copyright 2025 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.thoughtcrime.securesms.components.webrtc.v2 + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.SharingStarted +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.drop +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.flatMapLatest +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.mapLatest +import kotlinx.coroutines.flow.stateIn +import kotlinx.coroutines.rx3.asFlow +import kotlinx.coroutines.withContext +import org.thoughtcrime.securesms.database.SignalDatabase +import org.thoughtcrime.securesms.database.model.GroupRecord +import org.thoughtcrime.securesms.groups.ui.GroupMemberEntry +import org.thoughtcrime.securesms.recipients.LiveRecipient +import org.thoughtcrime.securesms.recipients.Recipient +import org.thoughtcrime.securesms.recipients.RecipientId + +/** + * Repository providing different fields about the call peer. + */ +@OptIn(ExperimentalCoroutinesApi::class) +class CallPeerRepository( + externalScope: CoroutineScope +) { + val recipientId = MutableStateFlow(RecipientId.UNKNOWN) + val liveRecipient: StateFlow = recipientId.mapLatest { Recipient.live(it) } + .stateIn(externalScope, SharingStarted.Eagerly, Recipient.live(RecipientId.UNKNOWN)) + + private val recipient: Flow = liveRecipient.flatMapLatest { it.observable().asFlow() } + private val groupRecipient = recipient.filter { it.isActiveGroup } + private val groupRecord: Flow = groupRecipient.mapLatest { + withContext(Dispatchers.IO) { + SignalDatabase.groups.getGroup(it.requireGroupId()).get() + } + } + + val groupMembers: Flow> = groupRecord.mapLatest { record -> + withContext(Dispatchers.IO) { + record.members.map { Recipient.resolved(it) }.map { GroupMemberEntry.FullMember(it, record.isAdmin(it)) } + } + } + + val groupMembersChanged: Flow> = groupMembers.drop(1) + val groupMembersCount: Flow = groupMembers.map { it.size } +} diff --git a/app/src/main/java/org/thoughtcrime/securesms/components/webrtc/v2/WebRtcCallActivity.kt b/app/src/main/java/org/thoughtcrime/securesms/components/webrtc/v2/WebRtcCallActivity.kt index 848a67e136..1215d23959 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/components/webrtc/v2/WebRtcCallActivity.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/components/webrtc/v2/WebRtcCallActivity.kt @@ -25,15 +25,20 @@ import androidx.annotation.RequiresApi import androidx.appcompat.app.AppCompatDelegate import androidx.core.content.ContextCompat import androidx.core.util.Consumer -import androidx.lifecycle.toLiveData +import androidx.lifecycle.Lifecycle +import androidx.lifecycle.lifecycleScope +import androidx.lifecycle.repeatOnLifecycle import androidx.window.java.layout.WindowInfoTrackerCallbackAdapter import androidx.window.layout.FoldingFeature import androidx.window.layout.WindowInfoTracker import androidx.window.layout.WindowLayoutInfo import com.google.android.material.dialog.MaterialAlertDialogBuilder import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers -import io.reactivex.rxjava3.core.BackpressureStrategy import io.reactivex.rxjava3.disposables.Disposable +import kotlinx.coroutines.flow.collectLatest +import kotlinx.coroutines.flow.combine +import kotlinx.coroutines.flow.filterNotNull +import kotlinx.coroutines.launch import org.greenrobot.eventbus.EventBus import org.greenrobot.eventbus.Subscribe import org.greenrobot.eventbus.ThreadMode @@ -88,7 +93,6 @@ import org.thoughtcrime.securesms.util.TextSecurePreferences import org.thoughtcrime.securesms.util.ThrottledDebouncer import org.thoughtcrime.securesms.util.VibrateUtil import org.thoughtcrime.securesms.util.WindowUtil -import org.thoughtcrime.securesms.util.livedata.LiveDataUtil import org.thoughtcrime.securesms.util.visible import org.thoughtcrime.securesms.webrtc.CallParticipantsViewState import org.thoughtcrime.securesms.webrtc.audio.SignalAudioManager @@ -211,12 +215,12 @@ class WebRtcCallActivity : BaseActivity(), SafetyNumberChangeDialog.Callback, Re if (!hasCameraPermission() && !hasAudioPermission()) { askCameraAudioPermissions { - callScreen.setMicEnabled(viewModel.microphoneEnabled.value ?: false) + callScreen.setMicEnabled(viewModel.microphoneEnabled.value) handleSetMuteVideo(false) } } else if (!hasAudioPermission()) { askAudioPermissions { - callScreen.setMicEnabled(viewModel.microphoneEnabled.value ?: false) + callScreen.setMicEnabled(viewModel.microphoneEnabled.value) } } } @@ -498,36 +502,74 @@ class WebRtcCallActivity : BaseActivity(), SafetyNumberChangeDialog.Callback, Re viewModel.setIsLandscapeEnabled(true) viewModel.setIsInPipMode(isInPipMode()) - viewModel.microphoneEnabled.observe(this, callScreen::setMicEnabled) - viewModel.getWebRtcControls().observe(this) { controls -> - callScreen.setWebRtcControls(controls) - controlsAndInfo.updateControls(controls) + + lifecycleScope.launch { + lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) { + launch { + viewModel.microphoneEnabled.collectLatest { + callScreen.setMicEnabled(it) + } + } + + launch { + viewModel.getWebRtcControls().collectLatest { + callScreen.setWebRtcControls(it) + controlsAndInfo.updateControls(it) + } + } + + launch { + viewModel.getEvents().collect { handleViewModelEvent(it) } + } + + launch { + viewModel.getInCallStatus().collectLatest { + handleInCallStatus(it) + } + } + + launch { + viewModel.getRecipientFlow().collectLatest { + callScreen.setRecipient(it) + } + } + + launch { + val isStartedFromCallLink = getCallIntent().isStartedFromCallLink + combine( + viewModel.callParticipantsState, + viewModel.getEphemeralState().filterNotNull() + ) { state, ephemeralState -> + CallParticipantsViewState(state, ephemeralState, orientation == Orientation.PORTRAIT_BOTTOM_EDGE, true, isStartedFromCallLink) + }.collectLatest(callScreen::updateCallParticipants) + } + + launch { + viewModel.getCallParticipantListUpdate().collectLatest(participantUpdateWindow::addCallParticipantListUpdate) + } + + launch { + viewModel.getSafetyNumberChangeEvent().collect { handleSafetyNumberChangeEvent(it) } + } + + launch { + viewModel.getGroupMembersChanged().collectLatest { updateGroupMembersForGroupCall() } + } + + launch { + viewModel.getGroupMemberCount().collectLatest { handleGroupMemberCountChange(it) } + } + + launch { + viewModel.shouldShowSpeakerHint().collectLatest { updateSpeakerHint(it) } + } + } } - viewModel.getEvents().observe(this, this::handleViewModelEvent) - - lifecycleDisposable.add(viewModel.getInCallStatus().subscribe(this::handleInCallStatus)) - lifecycleDisposable.add(viewModel.getRecipientFlowable().subscribe(callScreen::setRecipient)) - - val isStartedFromCallLink = getCallIntent().isStartedFromCallLink - LiveDataUtil.combineLatest( - viewModel.callParticipantsState.toFlowable(BackpressureStrategy.LATEST).toLiveData(), - viewModel.getEphemeralState() - ) { state, ephemeralState -> - CallParticipantsViewState(state, ephemeralState, orientation == Orientation.PORTRAIT_BOTTOM_EDGE, true, isStartedFromCallLink) - }.observe(this, callScreen::updateCallParticipants) - - viewModel.getCallParticipantListUpdate().observe(this, participantUpdateWindow::addCallParticipantListUpdate) - viewModel.getSafetyNumberChangeEvent().observe(this, this::handleSafetyNumberChangeEvent) - viewModel.getGroupMembersChanged().observe(this) { updateGroupMembersForGroupCall() } - viewModel.getGroupMemberCount().observe(this, this::handleGroupMemberCountChange) - lifecycleDisposable.add(viewModel.shouldShowSpeakerHint().subscribe(this::updateSpeakerHint)) callScreen.viewTreeObserver.addOnGlobalLayoutListener { val state = viewModel.callParticipantsStateSnapshot - if (state != null) { - if (state.needsNewRequestSizes()) { - requestNewSizesThrottle.publish { AppDependencies.signalCallManager.updateRenderedResolutions() } - } + if (state.needsNewRequestSizes()) { + requestNewSizesThrottle.publish { AppDependencies.signalCallManager.updateRenderedResolutions() } } } @@ -542,7 +584,14 @@ class WebRtcCallActivity : BaseActivity(), SafetyNumberChangeDialog.Callback, Re } callScreen.setPendingParticipantsViewListener(PendingParticipantsViewListener()) - lifecycleDisposable += viewModel.getPendingParticipants().subscribe(callScreen::updatePendingParticipantsList) + + lifecycleScope.launch { + lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) { + launch { + viewModel.getPendingParticipants().collect(callScreen::updatePendingParticipantsList) + } + } + } } private fun initializePictureInPictureParams() { @@ -558,9 +607,15 @@ class WebRtcCallActivity : BaseActivity(), SafetyNumberChangeDialog.Callback, Re pipBuilderParams.setAspectRatio(aspectRatio) if (Build.VERSION.SDK_INT >= 31) { - viewModel.canEnterPipMode().observe(this) { - pipBuilderParams.setAutoEnterEnabled(it) - tryToSetPictureInPictureParams() + lifecycleScope.launch { + lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) { + launch { + viewModel.canEnterPipMode().collectLatest { + pipBuilderParams.setAutoEnterEnabled(it) + tryToSetPictureInPictureParams() + } + } + } } } else { tryToSetPictureInPictureParams() @@ -1077,7 +1132,7 @@ class WebRtcCallActivity : BaseActivity(), SafetyNumberChangeDialog.Callback, Re override fun onHidden() { val controlState = viewModel.getWebRtcControls().value - if (controlState == null || !controlState.displayErrorControls()) { + if (!controlState.displayErrorControls()) { fullscreenHelper.hideSystemUI() videoTooltip?.dismiss() } diff --git a/app/src/main/java/org/thoughtcrime/securesms/components/webrtc/v2/WebRtcCallViewModel.kt b/app/src/main/java/org/thoughtcrime/securesms/components/webrtc/v2/WebRtcCallViewModel.kt index 8032c15fcf..a437de1772 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/components/webrtc/v2/WebRtcCallViewModel.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/components/webrtc/v2/WebRtcCallViewModel.kt @@ -8,22 +8,24 @@ package org.thoughtcrime.securesms.components.webrtc.v2 import android.os.Handler import android.os.Looper import androidx.annotation.MainThread -import androidx.lifecycle.LifecycleOwner -import androidx.lifecycle.LiveData -import androidx.lifecycle.MutableLiveData -import androidx.lifecycle.Observer import androidx.lifecycle.ViewModel -import androidx.lifecycle.distinctUntilChanged -import androidx.lifecycle.map -import androidx.lifecycle.switchMap -import androidx.lifecycle.toPublisher -import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers -import io.reactivex.rxjava3.core.BackpressureStrategy -import io.reactivex.rxjava3.core.Flowable -import io.reactivex.rxjava3.core.Observable -import io.reactivex.rxjava3.processors.BehaviorProcessor -import io.reactivex.rxjava3.subjects.BehaviorSubject -import org.signal.core.util.ThreadUtil +import androidx.lifecycle.viewModelScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.FlowCollector +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.SharingStarted +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.combine +import kotlinx.coroutines.flow.distinctUntilChanged +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.flatMapLatest +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.stateIn +import kotlinx.coroutines.flow.update +import kotlinx.coroutines.launch +import kotlinx.coroutines.rx3.asFlow import org.thoughtcrime.securesms.components.webrtc.CallParticipantListUpdate import org.thoughtcrime.securesms.components.webrtc.CallParticipantsState import org.thoughtcrime.securesms.components.webrtc.InCallStatus @@ -37,7 +39,6 @@ import org.thoughtcrime.securesms.dependencies.AppDependencies import org.thoughtcrime.securesms.events.CallParticipant import org.thoughtcrime.securesms.events.CallParticipantId import org.thoughtcrime.securesms.events.WebRtcViewModel -import org.thoughtcrime.securesms.groups.LiveGroup import org.thoughtcrime.securesms.groups.ui.GroupMemberEntry import org.thoughtcrime.securesms.keyvalue.SignalStore import org.thoughtcrime.securesms.recipients.LiveRecipient @@ -47,37 +48,37 @@ import org.thoughtcrime.securesms.service.webrtc.PendingParticipantCollection import org.thoughtcrime.securesms.service.webrtc.state.PendingParticipantsState import org.thoughtcrime.securesms.service.webrtc.state.WebRtcEphemeralState import org.thoughtcrime.securesms.util.NetworkUtil -import org.thoughtcrime.securesms.util.SingleLiveEvent -import org.thoughtcrime.securesms.util.livedata.LiveDataUtil import org.thoughtcrime.securesms.webrtc.audio.SignalAudioManager import java.util.Collections +@OptIn(ExperimentalCoroutinesApi::class) class WebRtcCallViewModel : ViewModel() { - private val internalMicrophoneEnabled = MutableLiveData(true) - private val isInPipMode = MutableLiveData(false) - private val webRtcControls = MutableLiveData(WebRtcControls.NONE) - private val foldableState = MutableLiveData(WebRtcControls.FoldableState.flat()) - private val controlsWithFoldableState: LiveData = LiveDataUtil.combineLatest(foldableState, webRtcControls, this::updateControlsFoldableState) - private val realWebRtcControls: LiveData = LiveDataUtil.combineLatest(isInPipMode, controlsWithFoldableState, this::getRealWebRtcControls) - private val events = SingleLiveEvent() - private val elapsed = BehaviorSubject.createDefault(-1L) - private val liveRecipient = MutableLiveData(Recipient.UNKNOWN.live()) - private val participantsState = BehaviorSubject.createDefault(CallParticipantsState.STARTING_STATE) - private val callParticipantListUpdate = SingleLiveEvent() - private val identityChangedRecipients = MutableLiveData>(Collections.emptyList()) - private val safetyNumberChangeEvent: LiveData = LiveDataUtil.combineLatest(isInPipMode, identityChangedRecipients, ::SafetyNumberChangeEvent) - private val groupRecipient: LiveData = LiveDataUtil.filter(liveRecipient.switchMap(LiveRecipient::getLiveData), Recipient::isActiveGroup) - private val groupMembers: LiveData> = groupRecipient.switchMap { r -> LiveGroup(r.requireGroupId()).fullMembers } - private val groupMembersChanged: LiveData> = LiveDataUtil.skip(groupMembers, 1) - private val groupMembersCount: LiveData = groupMembers.map { it.size } - private val shouldShowSpeakerHint: Observable = participantsState.map(this::shouldShowSpeakerHint) - private val isLandscapeEnabled = MutableLiveData() - private val canEnterPipMode = MutableLiveData(false) - private val groupMemberStateUpdater = Observer> { m -> participantsState.onNext(CallParticipantsState.update(participantsState.value!!, m)) } - private val ephemeralState = MutableLiveData() - private val recipientId = BehaviorProcessor.createDefault(RecipientId.UNKNOWN) + private val callPeerRepository = CallPeerRepository(viewModelScope) - private val pendingParticipants = BehaviorSubject.create() + private val internalMicrophoneEnabled = MutableStateFlow(true) + private val isInPipMode = MutableStateFlow(false) + private val webRtcControls = MutableStateFlow(WebRtcControls.NONE) + private val foldableState = MutableStateFlow(WebRtcControls.FoldableState.flat()) + private val identityChangedRecipients = MutableStateFlow>(Collections.emptyList()) + private val isLandscapeEnabled = MutableStateFlow(null) + private val canEnterPipMode = MutableStateFlow(false) + private val ephemeralState = MutableStateFlow(null) + + private val controlsWithFoldableState: Flow = combine(foldableState, webRtcControls, this::updateControlsFoldableState) + private val realWebRtcControls: StateFlow = combine(isInPipMode, controlsWithFoldableState, this::getRealWebRtcControls) + .stateIn(viewModelScope, SharingStarted.Eagerly, WebRtcControls.NONE) + private val safetyNumberChangeEvent: Flow = combine(isInPipMode, identityChangedRecipients, ::SafetyNumberChangeEvent) + + private val events = MutableSharedFlow() + private val callParticipantListUpdate = MutableSharedFlow() + + private val elapsed = MutableStateFlow(-1L) + private val participantsState = MutableStateFlow(CallParticipantsState.STARTING_STATE) + private val pendingParticipants = MutableStateFlow(PendingParticipantCollection()) + + private val groupMemberStateUpdater = FlowCollector> { m -> participantsState.update { CallParticipantsState.update(it, m) } } + + private val shouldShowSpeakerHint: Flow = participantsState.map(this::shouldShowSpeakerHint) private val elapsedTimeHandler = Handler(Looper.getMainLooper()) private val elapsedTimeRunnable = Runnable { handleTick() } @@ -98,51 +99,51 @@ class WebRtcCallViewModel : ViewModel() { private set init { - groupMembers.observeForever(groupMemberStateUpdater) + viewModelScope.launch { + callPeerRepository.groupMembers.collect(groupMemberStateUpdater) + } } override fun onCleared() { super.onCleared() cancelTimer() - groupMembers.removeObserver(groupMemberStateUpdater) } - val microphoneEnabled: LiveData get() = internalMicrophoneEnabled.distinctUntilChanged() + val microphoneEnabled: StateFlow get() = internalMicrophoneEnabled - fun getWebRtcControls(): LiveData = realWebRtcControls + fun getWebRtcControls(): StateFlow = realWebRtcControls - val recipient: LiveRecipient get() = liveRecipient.value!! + val recipient: LiveRecipient get() = callPeerRepository.liveRecipient.value - fun getRecipientFlowable(): Flowable { - return recipientId - .switchMap { Recipient.observable(it).toFlowable(BackpressureStrategy.LATEST) } - .observeOn(AndroidSchedulers.mainThread()) + fun getRecipientFlow(): Flow { + return callPeerRepository.recipientId.flatMapLatest { + Recipient.observable(it).asFlow() + } } fun setRecipient(recipient: Recipient) { - recipientId.onNext(recipient.id) - liveRecipient.value = recipient.live() + callPeerRepository.recipientId.value = recipient.id } fun setFoldableState(foldableState: WebRtcControls.FoldableState) { - this.foldableState.postValue(foldableState) - ThreadUtil.runOnMain { participantsState.onNext(CallParticipantsState.update(participantsState.value!!, foldableState)) } + this.foldableState.update { foldableState } + participantsState.update { CallParticipantsState.update(it, foldableState) } } - fun getEvents(): LiveData { + fun getEvents(): Flow { return events } - fun getInCallStatus(): Observable { - val elapsedTime: Observable = elapsed.map { timeInCall -> if (callConnectedTime == -1L) -1L else timeInCall } + fun getInCallStatus(): Flow { + val elapsedTime: Flow = elapsed.map { timeInCall -> if (callConnectedTime == -1L) -1L else timeInCall } - return Observable.combineLatest( + return combine( elapsedTime, pendingParticipants, participantsState ) { time, pendingParticipants, participantsState -> if (!recipient.get().isCallLink) { - return@combineLatest InCallStatus.ElapsedTime(time) + return@combine InCallStatus.ElapsedTime(time) } val pending: Set = pendingParticipants.getUnresolvedPendingParticipants() @@ -152,12 +153,12 @@ class WebRtcCallViewModel : ViewModel() { } else { InCallStatus.JoinedCallLinkUsers(participantsState.participantCount.orElse(0L).toInt()) } - }.distinctUntilChanged().observeOn(AndroidSchedulers.mainThread()) + }.distinctUntilChanged() } - fun getCallControlsState(lifecycleOwner: LifecycleOwner): Flowable { - val groupSize: Flowable = recipientId.filter { it != RecipientId.UNKNOWN } - .switchMap { Recipient.observable(it).toFlowable(BackpressureStrategy.LATEST) } + fun getCallControlsState(): Flow { + val groupSize: Flow = callPeerRepository.recipientId.filter { it != RecipientId.UNKNOWN } + .flatMapLatest { Recipient.observable(it).asFlow() } .map { if (it.isActiveGroup) { SignalDatabase.groups.getGroupMemberIds(it.requireGroupId(), GroupTable.MemberSet.FULL_MEMBERS_INCLUDING_SELF).size @@ -166,47 +167,47 @@ class WebRtcCallViewModel : ViewModel() { } } - return Flowable.combineLatest( - callParticipantsState.toFlowable(BackpressureStrategy.LATEST), - getWebRtcControls().toPublisher(lifecycleOwner), + return combine( + callParticipantsState, + getWebRtcControls(), groupSize, CallControlsState::fromViewModelData ) } - val callParticipantsState: Observable get() = participantsState + val callParticipantsState: Flow get() = participantsState - val callParticipantsStateSnapshot: CallParticipantsState? get() = participantsState.value + val callParticipantsStateSnapshot: CallParticipantsState get() = participantsState.value - fun getCallParticipantListUpdate(): LiveData { + fun getCallParticipantListUpdate(): Flow { return callParticipantListUpdate } - fun getSafetyNumberChangeEvent(): LiveData { + fun getSafetyNumberChangeEvent(): Flow { return safetyNumberChangeEvent } - fun getGroupMembersChanged(): LiveData> { - return groupMembersChanged + fun getGroupMembersChanged(): Flow> { + return callPeerRepository.groupMembersChanged } - fun getGroupMemberCount(): LiveData { - return groupMembersCount + fun getGroupMemberCount(): Flow { + return callPeerRepository.groupMembersCount } - fun shouldShowSpeakerHint(): Observable { - return shouldShowSpeakerHint.observeOn(AndroidSchedulers.mainThread()) + fun shouldShowSpeakerHint(): Flow { + return shouldShowSpeakerHint } fun getCurrentAudioOutput(): WebRtcAudioOutput { - return getWebRtcControls().value!!.audioOutput + return getWebRtcControls().value.audioOutput } - fun getEphemeralState(): LiveData { + fun getEphemeralState(): Flow { return ephemeralState } - fun canEnterPipMode(): LiveData { + fun canEnterPipMode(): StateFlow { return canEnterPipMode } @@ -214,22 +215,22 @@ class WebRtcCallViewModel : ViewModel() { return answerWithVideoAvailable } - fun getPendingParticipants(): Observable { - val isInPipMode: Observable = participantsState.map { it.isInPipMode }.distinctUntilChanged() - return Observable.combineLatest(pendingParticipants, isInPipMode, ::PendingParticipantsState) + fun getPendingParticipants(): Flow { + val isInPipMode: Flow = participantsState.map { it.isInPipMode }.distinctUntilChanged() + return combine(pendingParticipants, isInPipMode, ::PendingParticipantsState) } fun getPendingParticipantsSnapshot(): PendingParticipantCollection { - return pendingParticipants.value!! + return pendingParticipants.value } fun setIsInPipMode(isInPipMode: Boolean) { - this.isInPipMode.value = isInPipMode - participantsState.onNext(CallParticipantsState.update(participantsState.value!!, isInPipMode)) + this.isInPipMode.update { isInPipMode } + participantsState.update { CallParticipantsState.update(it, isInPipMode) } } fun setIsLandscapeEnabled(isLandscapeEnabled: Boolean) { - this.isLandscapeEnabled.postValue(isLandscapeEnabled) + this.isLandscapeEnabled.update { isLandscapeEnabled } } @MainThread @@ -238,22 +239,25 @@ class WebRtcCallViewModel : ViewModel() { SignalStore.tooltips.markGroupCallSpeakerViewSeen() } - val state = participantsState.value!! + val state = participantsState.value if (showScreenShareTip && state.focusedParticipant.isScreenSharing && state.isViewingFocusedParticipant && page == CallParticipantsState.SelectedPage.GRID ) { showScreenShareTip = false - events.value = CallEvent.ShowSwipeToSpeakerHint + viewModelScope.launch { + events.emit(CallEvent.ShowSwipeToSpeakerHint) + } } - participantsState.onNext(CallParticipantsState.update(participantsState.value!!, page)) + participantsState.update { CallParticipantsState.update(it, page) } } fun onLocalPictureInPictureClicked() { - val state = participantsState.value!! - participantsState.onNext(CallParticipantsState.setExpanded(participantsState.value!!, state.localRenderState != WebRtcLocalRenderState.EXPANDED)) + participantsState.update { + CallParticipantsState.setExpanded(it, it.localRenderState != WebRtcLocalRenderState.EXPANDED) + } } fun onDismissedVideoTooltip() { @@ -280,16 +284,20 @@ class WebRtcCallViewModel : ViewModel() { val wasScreenSharing: Boolean = state.focusedParticipant.isScreenSharing val newState: CallParticipantsState = CallParticipantsState.update(state, webRtcViewModel, enableVideo) - participantsState.onNext(newState) + participantsState.update { newState } if (switchOnFirstScreenShare && !wasScreenSharing && newState.focusedParticipant.isScreenSharing) { switchOnFirstScreenShare = false - events.value = CallEvent.SwitchToSpeaker + viewModelScope.launch { + events.emit(CallEvent.SwitchToSpeaker) + } } if (webRtcViewModel.groupState.isConnected) { if (!containsPlaceholders(previousParticipantList)) { val update = CallParticipantListUpdate.computeDeltaUpdate(previousParticipantList, webRtcViewModel.remoteParticipants) - callParticipantListUpdate.value = update + viewModelScope.launch { + callParticipantListUpdate.emit(update) + } } previousParticipantList = webRtcViewModel.remoteParticipants @@ -312,7 +320,7 @@ class WebRtcCallViewModel : ViewModel() { webRtcViewModel.remoteParticipants.size > CallParticipantsState.SMALL_GROUP_MAX ) - pendingParticipants.onNext(webRtcViewModel.pendingParticipants) + pendingParticipants.update { webRtcViewModel.pendingParticipants } if (newState.isInOutgoingRingingMode) { cancelTimer() @@ -341,17 +349,17 @@ class WebRtcCallViewModel : ViewModel() { if (localParticipant.cameraState.isEnabled) { canDisplayTooltipIfNeeded = false hasEnabledLocalVideo = true - events.value = CallEvent.DismissVideoTooltip + emitEvent(CallEvent.DismissVideoTooltip) } if (canDisplayTooltipIfNeeded && webRtcViewModel.isRemoteVideoEnabled && !hasEnabledLocalVideo) { canDisplayTooltipIfNeeded = false - events.value = CallEvent.ShowVideoTooltip + emitEvent(CallEvent.ShowVideoTooltip) } if (canDisplayPopupIfNeeded && webRtcViewModel.isCellularConnection && NetworkUtil.isConnectedWifi(AppDependencies.application)) { canDisplayPopupIfNeeded = false - events.value = CallEvent.ShowWifiToCellularPopup + emitEvent(CallEvent.ShowWifiToCellularPopup) } else if (!webRtcViewModel.isCellularConnection) { canDisplayPopupIfNeeded = true } @@ -363,7 +371,7 @@ class WebRtcCallViewModel : ViewModel() { newState.allRemoteParticipants.isNotEmpty() ) { canDisplaySwitchCameraTooltipIfNeeded = false - events.value = CallEvent.ShowSwitchCameraTooltip + emitEvent(CallEvent.ShowSwitchCameraTooltip) } } @@ -379,13 +387,19 @@ class WebRtcCallViewModel : ViewModel() { WebRtcCallRepository.getIdentityRecords(recipient) { identityRecords -> if (identityRecords.isUntrusted(false) || identityRecords.isUnverified(false)) { val records = identityRecords.unverifiedRecords + identityRecords.untrustedRecords - events.postValue(CallEvent.ShowGroupCallSafetyNumberChange(records)) + emitEvent(CallEvent.ShowGroupCallSafetyNumberChange(records)) } else { - events.postValue(CallEvent.StartCall(isVideoCall)) + emitEvent(CallEvent.StartCall(isVideoCall)) } } } else { - events.postValue(CallEvent.StartCall(isVideoCall)) + emitEvent(CallEvent.StartCall(isVideoCall)) + } + } + + private fun emitEvent(callEvent: CallEvent) { + viewModelScope.launch { + events.emit(callEvent) } } @@ -402,7 +416,7 @@ class WebRtcCallViewModel : ViewModel() { } val newValue = (System.currentTimeMillis() - callConnectedTime) / 1000 - elapsed.onNext(newValue) + elapsed.update { newValue } elapsedTimeHandler.postDelayed(elapsedTimeRunnable, 1000) }