Add peek and join capabilities to call links implementation.

This commit is contained in:
Alex Hart
2023-06-01 13:53:23 -03:00
committed by Cody Henthorne
parent f8434bede5
commit 62940893f0
18 changed files with 455 additions and 16 deletions

View File

@@ -12,5 +12,10 @@ enum class CallLogFilter {
/**
* Only missed calls will be displayed
*/
MISSED
MISSED,
/**
* Only ad-hoc calls will be returned
*/
AD_HOC
}

View File

@@ -7,6 +7,7 @@ import org.signal.core.util.concurrent.SignalExecutors
import org.thoughtcrime.securesms.database.DatabaseObserver
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies
import org.thoughtcrime.securesms.jobs.CallLinkPeekJob
class CallLogRepository : CallLogPagedDataSource.CallRepository {
override fun getCallsCount(query: String?, filter: CallLogFilter): Int {
@@ -20,14 +21,14 @@ class CallLogRepository : CallLogPagedDataSource.CallRepository {
override fun getCallLinksCount(query: String?, filter: CallLogFilter): Int {
return when (filter) {
CallLogFilter.MISSED -> 0
CallLogFilter.ALL -> SignalDatabase.callLinks.getCallLinksCount(query)
CallLogFilter.ALL, CallLogFilter.AD_HOC -> SignalDatabase.callLinks.getCallLinksCount(query)
}
}
override fun getCallLinks(query: String?, filter: CallLogFilter, start: Int, length: Int): List<CallLogRow> {
return when (filter) {
CallLogFilter.MISSED -> emptyList()
CallLogFilter.ALL -> SignalDatabase.callLinks.getCallLinks(query, start, length)
CallLogFilter.ALL, CallLogFilter.AD_HOC -> SignalDatabase.callLinks.getCallLinks(query, start, length)
}
}
@@ -72,4 +73,29 @@ class CallLogRepository : CallLogPagedDataSource.CallRepository {
SignalDatabase.calls.deleteAllCallEventsExcept(selectedCallRowIds, missedOnly)
}.observeOn(Schedulers.io())
}
fun peekCallLinks(): Completable {
return Completable.fromAction {
val callLinks: List<CallLogRow.CallLink> = SignalDatabase.callLinks.getCallLinks(
query = null,
offset = 0,
limit = 10
)
val callEvents: List<CallLogRow.Call> = SignalDatabase.calls.getCalls(
offset = 0,
limit = 10,
searchTerm = null,
filter = CallLogFilter.AD_HOC
)
val recipients = (callLinks.map { it.recipient } + callEvents.map { it.peer }).toSet()
val jobs = recipients.take(10).map {
CallLinkPeekJob(it.id)
}
ApplicationDependencies.getJobManager().addAll(jobs)
}.observeOn(Schedulers.io())
}
}

View File

@@ -9,6 +9,7 @@ import org.thoughtcrime.securesms.database.CallLinkTable
import org.thoughtcrime.securesms.database.CallTable
import org.thoughtcrime.securesms.database.model.databaseprotos.GroupCallUpdateDetails
import org.thoughtcrime.securesms.recipients.Recipient
import org.thoughtcrime.securesms.service.webrtc.CallLinkPeekInfo
import org.thoughtcrime.securesms.service.webrtc.links.CallLinkRoomId
/**
@@ -25,6 +26,7 @@ sealed class CallLogRow {
val record: CallLinkTable.CallLink,
val recipient: Recipient,
val searchQuery: String?,
val callLinkPeekInfo: CallLinkPeekInfo?,
override val id: Id = Id.CallLink(record.roomId)
) : CallLogRow()
@@ -38,6 +40,7 @@ sealed class CallLogRow {
val groupCallState: GroupCallState,
val children: Set<Long>,
val searchQuery: String?,
val callLinkPeekInfo: CallLinkPeekInfo?,
override val id: Id = Id.Call(children)
) : CallLogRow()

View File

@@ -4,14 +4,19 @@ import androidx.annotation.MainThread
import androidx.lifecycle.ViewModel
import io.reactivex.rxjava3.core.BackpressureStrategy
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.disposables.CompositeDisposable
import io.reactivex.rxjava3.kotlin.plusAssign
import io.reactivex.rxjava3.processors.BehaviorProcessor
import io.reactivex.rxjava3.schedulers.Schedulers
import org.signal.paging.ObservablePagedData
import org.signal.paging.PagedData
import org.signal.paging.PagingConfig
import org.signal.paging.ProxyPagingController
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies
import org.thoughtcrime.securesms.util.FeatureFlags
import org.thoughtcrime.securesms.util.rx.RxStore
import java.util.concurrent.TimeUnit
/**
* ViewModel for call log management.
@@ -70,6 +75,22 @@ class CallLogViewModel(
disposables += callLogRepository.listenForChanges().subscribe {
controller.onDataInvalidated()
}
if (FeatureFlags.adHocCalling()) {
disposables += Observable
.interval(30, TimeUnit.SECONDS, Schedulers.computation())
.flatMapCompletable { callLogRepository.peekCallLinks() }
.subscribe()
disposables += ApplicationDependencies
.getSignalCallManager()
.peekInfoCache
.observeOn(Schedulers.computation())
.distinctUntilChanged()
.subscribe {
controller.onDataInvalidated()
}
}
}
override fun onCleared() {

View File

@@ -185,7 +185,13 @@ class CallLinkTable(context: Context, databaseHelper: SignalDatabase) : Database
fun getCallLinks(query: String?, offset: Int, limit: Int): List<CallLogRow.CallLink> {
return queryCallLinks(query, offset, limit, false).readToList {
val callLink = CallLinkDeserializer.deserialize(it)
CallLogRow.CallLink(callLink, Recipient.resolved(callLink.recipientId), query)
val peer = Recipient.resolved(callLink.recipientId)
CallLogRow.CallLink(
record = callLink,
recipient = peer,
searchQuery = query,
callLinkPeekInfo = ApplicationDependencies.getSignalCallManager().peekInfoSnapshot[peer.id]
)
}
}

View File

@@ -814,6 +814,7 @@ class CallTable(context: Context, databaseHelper: SignalDatabase) : DatabaseTabl
val filterClause: SqlUtil.Query = when (filter) {
CallLogFilter.ALL -> SqlUtil.buildQuery("$DELETION_TIMESTAMP = 0")
CallLogFilter.MISSED -> SqlUtil.buildQuery("$EVENT = ${Event.serialize(Event.MISSED)} AND $DELETION_TIMESTAMP = 0")
CallLogFilter.AD_HOC -> SqlUtil.buildQuery("$TYPE = ${Type.serialize(Type.AD_HOC_CALL)} AND $DELETION_TIMESTAMP = 0")
}
val queryClause: SqlUtil.Query = if (!searchTerm.isNullOrEmpty()) {
@@ -968,14 +969,16 @@ class CallTable(context: Context, databaseHelper: SignalDatabase) : DatabaseTabl
.toSet()
val actualChildren = inPeriod.takeWhile { children.contains(it) }
val peer = Recipient.resolved(call.peer)
CallLogRow.Call(
record = call,
date = call.timestamp,
peer = Recipient.resolved(call.peer),
peer = peer,
groupCallState = CallLogRow.GroupCallState.fromDetails(groupCallDetails),
children = actualChildren.toSet(),
searchQuery = searchTerm
searchQuery = searchTerm,
callLinkPeekInfo = ApplicationDependencies.getSignalCallManager().peekInfoSnapshot[peer.id]
)
}
}

View File

@@ -0,0 +1,83 @@
/**
* Copyright 2023 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.jobs
import org.signal.core.util.logging.Log
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobmanager.JsonJobData
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint
import org.thoughtcrime.securesms.recipients.Recipient
import org.thoughtcrime.securesms.recipients.RecipientId
import org.thoughtcrime.securesms.util.FeatureFlags
import java.util.concurrent.TimeUnit
/**
* PeekJob for refreshing call link data. Small lifespan, because these are only expected to run
* while the Calls Tab is in the foreground.
*
*
* While we may not necessarily require the weight of the job for this use-case, there are some nice
* properties around deduplication and lifetimes that jobs provide.
*/
internal class CallLinkPeekJob private constructor(
parameters: Parameters,
private val callLinkRecipientId: RecipientId
) : BaseJob(parameters) {
companion object {
private val TAG = Log.tag(CallLinkPeekJob::class.java)
const val KEY = "CallLinkPeekJob"
private const val KEY_CALL_LINK_RECIPIENT_ID = "call_link_recipient_id"
}
constructor(callLinkRecipientId: RecipientId) : this(
Parameters.Builder()
.setQueue(PushProcessMessageJob.getQueueName(callLinkRecipientId))
.setMaxInstancesForQueue(1)
.setLifespan(TimeUnit.MINUTES.toMillis(1))
.addConstraint(NetworkConstraint.KEY)
.build(),
callLinkRecipientId
)
override fun onRun() {
if (!FeatureFlags.adHocCalling()) {
Log.i(TAG, "Ad hoc calling is disabled. Dropping peek for call link.")
return
}
val recipient = Recipient.resolved(callLinkRecipientId)
if (!recipient.isCallLink) {
Log.w(TAG, "Recipient was not a call link. Ignoring.")
return
}
ApplicationDependencies.getSignalCallManager().peekCallLinkCall(callLinkRecipientId)
}
override fun onShouldRetry(e: Exception): Boolean = false
override fun serialize(): ByteArray? {
return JsonJobData.Builder()
.putString(KEY_CALL_LINK_RECIPIENT_ID, callLinkRecipientId.serialize())
.serialize()
}
override fun getFactoryKey(): String {
return KEY
}
override fun onFailure() = Unit
class Factory : Job.Factory<CallLinkPeekJob?> {
override fun create(parameters: Parameters, serializedData: ByteArray?): CallLinkPeekJob {
val data = JsonJobData.deserialize(serializedData)
return CallLinkPeekJob(parameters, RecipientId.from(data.getString(KEY_CALL_LINK_RECIPIENT_ID)))
}
}
}

View File

@@ -97,6 +97,7 @@ public final class JobManagerFactories {
put(AvatarGroupsV1DownloadJob.KEY, new AvatarGroupsV1DownloadJob.Factory());
put(AvatarGroupsV2DownloadJob.KEY, new AvatarGroupsV2DownloadJob.Factory());
put(BoostReceiptRequestResponseJob.KEY, new BoostReceiptRequestResponseJob.Factory());
put(CallLinkPeekJob.KEY, new CallLinkPeekJob.Factory());
put("CallSyncEventJob", new FailingJob.Factory());
put(CallSyncEventJob.KEY, new CallSyncEventJob.Factory());
put(CheckServiceReachabilityJob.KEY, new CheckServiceReachabilityJob.Factory());

View File

@@ -0,0 +1,26 @@
/*
* Copyright 2023 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.service.webrtc
import org.signal.core.util.logging.Log
import org.thoughtcrime.securesms.service.webrtc.state.WebRtcServiceState
/**
* Process actions for when the call link has at least once been connected and joined.
*/
class CallLinkConnectedActionProcessor(
webRtcInteractor: WebRtcInteractor
) : GroupConnectedActionProcessor(webRtcInteractor) {
override fun getGroupNetworkUnavailableActionProcessor(): GroupNetworkUnavailableActionProcessor {
return CallLinkNetworkUnavailableActionProcessor(webRtcInteractor)
}
override fun handleGroupRequestUpdateMembers(currentState: WebRtcServiceState): WebRtcServiceState {
Log.i(tag, "handleGroupRequestUpdateMembers():")
return currentState
}
}

View File

@@ -0,0 +1,26 @@
/*
* Copyright 2023 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.service.webrtc
import org.signal.core.util.logging.Log
import org.thoughtcrime.securesms.service.webrtc.state.WebRtcServiceState
/**
* Process actions to go from lobby to a joined call link.
*/
class CallLinkJoiningActionProcessor(
webRtcInteractor: WebRtcInteractor
) : GroupJoiningActionProcessor(webRtcInteractor) {
override fun getGroupNetworkUnavailableActionProcessor(): GroupNetworkUnavailableActionProcessor {
return CallLinkNetworkUnavailableActionProcessor(webRtcInteractor)
}
override fun handleGroupRequestUpdateMembers(currentState: WebRtcServiceState): WebRtcServiceState {
Log.i(tag, "handleGroupRequestUpdateMembers():")
return currentState
}
}

View File

@@ -0,0 +1,22 @@
/*
* Copyright 2023 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.service.webrtc
/**
* Processor which is utilized when the network becomes unavailable during a call link call. In general,
* this is triggered whenever there is a call ended, and the ending was not the result of direct user
* action.
*
* This class will check the network status when handlePreJoinCall is invoked, and transition to
* [CallLinkPreJoinActionProcessor] as network becomes available again.
*/
class CallLinkNetworkUnavailableActionProcessor(
webRtcInteractor: WebRtcInteractor
) : GroupNetworkUnavailableActionProcessor(webRtcInteractor) {
override fun createGroupPreJoinActionProcessor(): GroupPreJoinActionProcessor {
return CallLinkPreJoinActionProcessor(webRtcInteractor)
}
}

View File

@@ -0,0 +1,25 @@
/*
* Copyright 2023 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.service.webrtc
import org.signal.ringrtc.CallId
import org.signal.ringrtc.PeekInfo
/**
* App-level peek info object for call links.
*/
data class CallLinkPeekInfo(
val callId: CallId?
) {
companion object {
@JvmStatic
fun fromPeekInfo(peekInfo: PeekInfo): CallLinkPeekInfo {
return CallLinkPeekInfo(
callId = peekInfo.eraId?.let { CallId.fromEra(it) }
)
}
}
}

View File

@@ -0,0 +1,105 @@
/*
* Copyright 2023 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.service.webrtc
import org.signal.core.util.logging.Log
import org.signal.libsignal.zkgroup.GenericServerPublicParams
import org.signal.libsignal.zkgroup.InvalidInputException
import org.signal.libsignal.zkgroup.VerificationFailedException
import org.signal.libsignal.zkgroup.calllinks.CallLinkSecretParams
import org.signal.ringrtc.CallException
import org.signal.ringrtc.CallLinkRootKey
import org.thoughtcrime.securesms.database.SignalDatabase.Companion.callLinks
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies
import org.thoughtcrime.securesms.events.WebRtcViewModel
import org.thoughtcrime.securesms.keyvalue.SignalStore
import org.thoughtcrime.securesms.ringrtc.RemotePeer
import org.thoughtcrime.securesms.service.webrtc.RingRtcDynamicConfiguration.getAudioProcessingMethod
import org.thoughtcrime.securesms.service.webrtc.state.WebRtcServiceState
import org.thoughtcrime.securesms.util.NetworkUtil
import java.io.IOException
/**
* Process actions while the user is in the pre-join lobby for the call link.
*/
class CallLinkPreJoinActionProcessor(
webRtcInteractor: WebRtcInteractor
) : GroupPreJoinActionProcessor(webRtcInteractor) {
companion object {
private val TAG = Log.tag(CallLinkPreJoinActionProcessor::class.java)
}
override fun handlePreJoinCall(currentState: WebRtcServiceState, remotePeer: RemotePeer): WebRtcServiceState {
Log.i(TAG, "handlePreJoinCall():")
val groupCall = try {
val callLink = callLinks.getCallLinkByRoomId(remotePeer.recipient.requireCallLinkRoomId())
if (callLink?.credentials == null) {
return groupCallFailure(currentState, "No access to this call link.", Exception())
}
val callLinkRootKey = CallLinkRootKey(callLink.credentials.linkKeyBytes)
val callLinkSecretParams = CallLinkSecretParams.deriveFromRootKey(callLink.credentials.linkKeyBytes)
val genericServerPublicParams = GenericServerPublicParams(
ApplicationDependencies.getSignalServiceNetworkAccess()
.getConfiguration()
.genericServerPublicParams
)
val callLinkAuthCredentialPresentation = ApplicationDependencies
.getGroupsV2Authorization()
.getCallLinkAuthorizationForToday(genericServerPublicParams, callLinkSecretParams)
webRtcInteractor.callManager.createCallLinkCall(
SignalStore.internalValues().groupCallingServer(),
callLinkAuthCredentialPresentation.serialize(),
callLinkRootKey,
callLink.credentials.adminPassBytes,
ByteArray(0),
AUDIO_LEVELS_INTERVAL,
getAudioProcessingMethod(),
webRtcInteractor.groupCallObserver
)
} catch (e: InvalidInputException) {
return groupCallFailure(currentState, "Failed to create server public parameters.", e)
} catch (e: IOException) {
return groupCallFailure(currentState, "Failed to get call link authorization", e)
} catch (e: VerificationFailedException) {
return groupCallFailure(currentState, "Failed to get call link authorization", e)
} catch (e: CallException) {
return groupCallFailure(currentState, "Failed to parse call link root key", e)
} ?: return groupCallFailure(currentState, "Failed to create group call object for call link.", Exception())
try {
groupCall.setOutgoingAudioMuted(true)
groupCall.setOutgoingVideoMuted(true)
groupCall.setDataMode(NetworkUtil.getCallingDataMode(context, groupCall.localDeviceState.networkRoute.localAdapterType))
Log.i(TAG, "Connecting to group call: " + currentState.callInfoState.callRecipient.id)
groupCall.connect()
} catch (e: CallException) {
return groupCallFailure(currentState, "Unable to connect to call link", e)
}
SignalStore.tooltips().markGroupCallingLobbyEntered()
return currentState.builder()
.changeCallInfoState()
.groupCall(groupCall)
.groupCallState(WebRtcViewModel.GroupCallState.DISCONNECTED)
.activePeer(RemotePeer(currentState.callInfoState.callRecipient.id, RemotePeer.GROUP_CALL_ID))
.build()
}
override fun handleGroupRequestUpdateMembers(currentState: WebRtcServiceState): WebRtcServiceState {
Log.i(tag, "handleGroupRequestUpdateMembers():")
return currentState
}
override fun getGroupNetworkUnavailableActionProcessor(): GroupNetworkUnavailableActionProcessor {
return CallLinkNetworkUnavailableActionProcessor(webRtcInteractor)
}
}

View File

@@ -295,7 +295,7 @@ public class GroupActionProcessor extends DeviceAwareActionProcessor {
VideoState videoState = currentState.getVideoState();
currentState = terminateGroupCall(currentState, false).builder()
.actionProcessor(new GroupNetworkUnavailableActionProcessor(webRtcInteractor))
.actionProcessor(getGroupNetworkUnavailableActionProcessor())
.changeVideoState()
.eglBase(videoState.getLockableEglBase())
.camera(videoState.getCamera())
@@ -321,4 +321,8 @@ public class GroupActionProcessor extends DeviceAwareActionProcessor {
return terminateGroupCall(currentState);
}
public @NonNull GroupNetworkUnavailableActionProcessor getGroupNetworkUnavailableActionProcessor() {
return new GroupNetworkUnavailableActionProcessor(webRtcInteractor);
}
}

View File

@@ -22,7 +22,7 @@ import org.thoughtcrime.securesms.service.webrtc.state.WebRtcServiceState;
* This class will check the network status when handlePreJoinCall is invoked, and transition to
* GroupPreJoinActionProcessor as network becomes available again.
*/
class GroupNetworkUnavailableActionProcessor extends WebRtcActionProcessor {
public class GroupNetworkUnavailableActionProcessor extends WebRtcActionProcessor {
private static final String TAG = Log.tag(GroupNetworkUnavailableActionProcessor.class);
@@ -38,7 +38,7 @@ class GroupNetworkUnavailableActionProcessor extends WebRtcActionProcessor {
NetworkInfo activeNetworkInfo = connectivityManager.getActiveNetworkInfo();
if (activeNetworkInfo != null && activeNetworkInfo.isConnected()) {
GroupPreJoinActionProcessor processor = new GroupPreJoinActionProcessor(webRtcInteractor);
GroupPreJoinActionProcessor processor = createGroupPreJoinActionProcessor();
return processor.handlePreJoinCall(currentState.builder().actionProcessor(processor).build(), remotePeer);
}
@@ -72,7 +72,7 @@ class GroupNetworkUnavailableActionProcessor extends WebRtcActionProcessor {
public @NonNull WebRtcServiceState handleNetworkChanged(@NonNull WebRtcServiceState currentState, boolean available) {
if (available) {
return currentState.builder()
.actionProcessor(new GroupPreJoinActionProcessor(webRtcInteractor))
.actionProcessor(createGroupPreJoinActionProcessor())
.changeCallInfoState()
.callState(WebRtcViewModel.State.CALL_PRE_JOIN)
.build();
@@ -80,4 +80,8 @@ class GroupNetworkUnavailableActionProcessor extends WebRtcActionProcessor {
return currentState;
}
}
protected @NonNull GroupPreJoinActionProcessor createGroupPreJoinActionProcessor() {
return new GroupPreJoinActionProcessor(webRtcInteractor);
}
}

View File

@@ -207,7 +207,7 @@ public class GroupPreJoinActionProcessor extends GroupActionProcessor {
public @NonNull WebRtcServiceState handleNetworkChanged(@NonNull WebRtcServiceState currentState, boolean available) {
if (!available) {
return currentState.builder()
.actionProcessor(new GroupNetworkUnavailableActionProcessor(webRtcInteractor))
.actionProcessor(getGroupNetworkUnavailableActionProcessor())
.changeCallInfoState()
.callState(WebRtcViewModel.State.NETWORK_FAILURE)
.build();

View File

@@ -65,9 +65,16 @@ public class IdleActionProcessor extends WebRtcActionProcessor {
protected @NonNull WebRtcServiceState handlePreJoinCall(@NonNull WebRtcServiceState currentState, @NonNull RemotePeer remotePeer) {
Log.i(TAG, "handlePreJoinCall():");
boolean isGroupCall = remotePeer.getRecipient().isPushV2Group() || remotePeer.getRecipient().isCallLink();
WebRtcActionProcessor processor = isGroupCall ? new GroupPreJoinActionProcessor(webRtcInteractor)
: new PreJoinActionProcessor(webRtcInteractor);
boolean isGroupCall = remotePeer.getRecipient().isPushV2Group() || remotePeer.getRecipient().isCallLink();
final WebRtcActionProcessor processor;
if (remotePeer.getRecipient().isCallLink()) {
processor = new CallLinkPreJoinActionProcessor(webRtcInteractor);
} else if (remotePeer.getRecipient().isPushV2Group()) {
processor = new GroupPreJoinActionProcessor(webRtcInteractor);
} else {
processor = new PreJoinActionProcessor(webRtcInteractor);
}
currentState = WebRtcVideoUtil.initializeVanityCamera(WebRtcVideoUtil.initializeVideo(context,
webRtcInteractor.getCameraEventListener(),

View File

@@ -16,11 +16,15 @@ import org.greenrobot.eventbus.EventBus;
import org.signal.core.util.concurrent.SignalExecutors;
import org.signal.core.util.logging.Log;
import org.signal.libsignal.protocol.util.Pair;
import org.signal.libsignal.zkgroup.GenericServerPublicParams;
import org.signal.libsignal.zkgroup.InvalidInputException;
import org.signal.libsignal.zkgroup.VerificationFailedException;
import org.signal.libsignal.zkgroup.calllinks.CallLinkAuthCredentialPresentation;
import org.signal.libsignal.zkgroup.calllinks.CallLinkSecretParams;
import org.signal.libsignal.zkgroup.groups.GroupIdentifier;
import org.signal.ringrtc.CallException;
import org.signal.ringrtc.CallId;
import org.signal.ringrtc.CallLinkRootKey;
import org.signal.ringrtc.CallManager;
import org.signal.ringrtc.GroupCall;
import org.signal.ringrtc.HttpHeader;
@@ -30,6 +34,7 @@ import org.signal.ringrtc.Remote;
import org.signal.storageservice.protos.groups.GroupExternalCredential;
import org.thoughtcrime.securesms.WebRtcCallActivity;
import org.thoughtcrime.securesms.crypto.UnidentifiedAccessUtil;
import org.thoughtcrime.securesms.database.CallLinkTable;
import org.thoughtcrime.securesms.database.CallTable;
import org.thoughtcrime.securesms.database.GroupTable;
import org.thoughtcrime.securesms.database.SignalDatabase;
@@ -52,11 +57,13 @@ import org.thoughtcrime.securesms.recipients.RecipientUtil;
import org.thoughtcrime.securesms.ringrtc.CameraEventListener;
import org.thoughtcrime.securesms.ringrtc.CameraState;
import org.thoughtcrime.securesms.ringrtc.RemotePeer;
import org.thoughtcrime.securesms.service.webrtc.links.CallLinkRoomId;
import org.thoughtcrime.securesms.service.webrtc.links.SignalCallLinkManager;
import org.thoughtcrime.securesms.service.webrtc.state.WebRtcEphemeralState;
import org.thoughtcrime.securesms.service.webrtc.state.WebRtcServiceState;
import org.thoughtcrime.securesms.util.AppForegroundObserver;
import org.thoughtcrime.securesms.util.BubbleUtil;
import org.thoughtcrime.securesms.util.FeatureFlags;
import org.thoughtcrime.securesms.util.RecipientAccessList;
import org.thoughtcrime.securesms.util.TextSecurePreferences;
import org.thoughtcrime.securesms.util.Util;
@@ -80,8 +87,10 @@ import org.whispersystems.signalservice.internal.push.SignalServiceProtos.SyncMe
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
@@ -123,12 +132,15 @@ public final class SignalCallManager implements CallManager.Observer, GroupCall.
private RxStore<WebRtcEphemeralState> ephemeralStateStore;
private boolean needsToSetSelfUuid = true;
private RxStore<Map<RecipientId, CallLinkPeekInfo>> linkPeekInfoStore;
public SignalCallManager(@NonNull Application application) {
this.context = application.getApplicationContext();
this.lockManager = new LockManager(this.context);
this.serviceExecutor = Executors.newSingleThreadExecutor();
this.networkExecutor = Executors.newSingleThreadExecutor();
this.ephemeralStateStore = new RxStore<>(new WebRtcEphemeralState(), Schedulers.from(serviceExecutor));
this.linkPeekInfoStore = new RxStore<>(new HashMap<>(), Schedulers.from(serviceExecutor));
CallManager callManager = null;
try {
@@ -159,6 +171,14 @@ public final class SignalCallManager implements CallManager.Observer, GroupCall.
return lockManager;
}
public @NonNull Flowable<Map<RecipientId, CallLinkPeekInfo>> getPeekInfoCache() {
return linkPeekInfoStore.getStateFlowable();
}
public @NonNull Map<RecipientId, CallLinkPeekInfo> getPeekInfoSnapshot() {
return linkPeekInfoStore.getState();
}
private void process(@NonNull ProcessAction action) {
Throwable t = new Throwable();
String caller = t.getStackTrace().length > 1 ? t.getStackTrace()[1].getMethodName() : "unknown";
@@ -334,6 +354,59 @@ private void processStateless(@NonNull Function1<WebRtcEphemeralState, WebRtcEph
process((s, p) -> p.handleDropCall(s, callId));
}
public void peekCallLinkCall(@NonNull RecipientId id) {
if (callManager == null) {
Log.i(TAG, "Unable to peekCallLinkCall, call manager is null");
return;
}
if (!FeatureFlags.adHocCalling()) {
Log.i(TAG, "Ad Hoc Calling is disabled. Ignoring request to peek.");
return;
}
networkExecutor.execute(() -> {
try {
Recipient callLinkRecipient = Recipient.resolved(id);
CallLinkRoomId callLinkRoomId = callLinkRecipient.requireCallLinkRoomId();
CallLinkTable.CallLink callLink = SignalDatabase.callLinks().getCallLinkByRoomId(callLinkRoomId);
if (callLink == null || callLink.getCredentials() == null) {
Log.w(TAG, "Cannot peek call link without credentials.");
return;
}
CallLinkRootKey callLinkRootKey = new CallLinkRootKey(callLink.getCredentials().getLinkKeyBytes());
GenericServerPublicParams genericServerPublicParams = new GenericServerPublicParams(ApplicationDependencies.getSignalServiceNetworkAccess()
.getConfiguration()
.getGenericServerPublicParams());
CallLinkAuthCredentialPresentation callLinkAuthCredentialPresentation = ApplicationDependencies.getGroupsV2Authorization()
.getCallLinkAuthorizationForToday(
genericServerPublicParams,
CallLinkSecretParams.deriveFromRootKey(callLinkRootKey.getKeyBytes())
);
callManager.peekCallLinkCall(SignalStore.internalValues().groupCallingServer(), callLinkAuthCredentialPresentation.serialize(), callLinkRootKey, peekInfo -> {
PeekInfo info = peekInfo.getValue();
if (info == null) {
Log.w(TAG, "Failed to get peek info: " + peekInfo.getStatus());
return;
}
linkPeekInfoStore.update(store -> {
Map<RecipientId, CallLinkPeekInfo> newHashMap = new HashMap<>(store);
newHashMap.put(id, CallLinkPeekInfo.fromPeekInfo(info));
return newHashMap;
});
});
} catch (CallException | VerificationFailedException | InvalidInputException | IOException e) {
Log.i(TAG, "error peeking call link", e);
}
});
}
public void peekGroupCall(@NonNull RecipientId id) {
if (callManager == null) {
Log.i(TAG, "Unable to peekGroupCall, call manager is null");
@@ -349,7 +422,6 @@ private void processStateless(@NonNull Function1<WebRtcEphemeralState, WebRtcEph
List<GroupCall.GroupMemberInfo> members = Stream.of(GroupManager.getUuidCipherTexts(context, groupId))
.map(entry -> new GroupCall.GroupMemberInfo(entry.getKey(), entry.getValue().serialize()))
.toList();
callManager.peekGroupCall(SignalStore.internalValues().groupCallingServer(), credential.getTokenBytes().toByteArray(), members, peekInfo -> {
Long threadId = SignalDatabase.threads().getThreadIdFor(group.getId());
@@ -541,7 +613,7 @@ private void processStateless(@NonNull Function1<WebRtcEphemeralState, WebRtcEph
process((s, p) -> p.handleNetworkRouteChanged(s, networkRoute));
}
@Override
@Override
public void onAudioLevels(Remote remote, int capturedLevel, int receivedLevel) {
processStateless(s -> serviceState.getActionProcessor().handleAudioLevelsChanged(serviceState, s, capturedLevel, receivedLevel));
}