Add CallLink Observed event and handling.

This commit is contained in:
Alex Hart
2024-06-06 10:39:50 -03:00
parent d3eb480d31
commit f572eb5322
7 changed files with 160 additions and 40 deletions

View File

@@ -10,6 +10,7 @@ import org.signal.core.util.SqlUtil
import org.signal.core.util.count
import org.signal.core.util.delete
import org.signal.core.util.deleteAll
import org.signal.core.util.exists
import org.signal.core.util.flatten
import org.signal.core.util.insertInto
import org.signal.core.util.isAbsent
@@ -580,6 +581,22 @@ class CallTable(context: Context, databaseHelper: SignalDatabase) : DatabaseTabl
AppDependencies.databaseObserver.notifyCallUpdateObservers()
}
fun insertOrUpdateAdHocCallFromObserveEvent(
callRecipient: Recipient,
timestamp: Long,
callId: Long
) {
handleCallLinkUpdate(callRecipient, timestamp, CallId(callId), Direction.INCOMING)
}
fun insertAdHocCallFromObserveEvent(
callRecipient: Recipient,
timestamp: Long,
eraId: String
): Boolean {
return handleCallLinkUpdate(callRecipient, timestamp, CallId.fromEra(eraId), Direction.INCOMING)
}
fun insertOrUpdateGroupCallFromExternalEvent(
groupRecipientId: RecipientId,
sender: RecipientId,
@@ -606,7 +623,7 @@ class CallTable(context: Context, databaseHelper: SignalDatabase) : DatabaseTabl
) {
val recipient = Recipient.resolved(groupRecipientId)
if (recipient.isCallLink) {
handleCallLinkUpdate(recipient, timestamp, peekGroupCallEraId)
handleCallLinkUpdate(recipient, timestamp, peekGroupCallEraId?.let { CallId.fromEra(it) })
} else {
handleGroupUpdate(recipient, sender, timestamp, peekGroupCallEraId, peekJoinedUuids, isCallFull)
}
@@ -676,35 +693,53 @@ class CallTable(context: Context, databaseHelper: SignalDatabase) : DatabaseTabl
}
}
/**
* @return Whether or not a new row was inserted.
*/
private fun handleCallLinkUpdate(
callLinkRecipient: Recipient,
timestamp: Long,
peekGroupCallEraId: String?
) {
callId: CallId?,
direction: Direction = Direction.OUTGOING,
skipTimestampUpdate: Boolean = false
): Boolean {
check(callLinkRecipient.isCallLink)
val callId = peekGroupCallEraId?.let { CallId.fromEra(it).longValue() } ?: return
writableDatabase.withinTransaction { db ->
db.delete(TABLE_NAME)
.where("$PEER = ?", callLinkRecipient.id.serialize())
.run()
db.insertInto(TABLE_NAME)
.values(
CALL_ID to callId,
MESSAGE_ID to null,
PEER to callLinkRecipient.id.toLong(),
EVENT to Event.serialize(Event.GENERIC_GROUP_CALL),
TYPE to Type.serialize(Type.AD_HOC_CALL),
DIRECTION to Direction.serialize(Direction.OUTGOING),
TIMESTAMP to timestamp,
RINGER to null
).run(SQLiteDatabase.CONFLICT_ABORT)
if (callId == null) {
return false
}
val didInsert = writableDatabase.withinTransaction { db ->
val exists = db.exists(TABLE_NAME)
.where("$PEER = ? AND $CALL_ID = ?", callLinkRecipient.id.serialize(), callId)
.run()
if (exists && !skipTimestampUpdate) {
db.update(TABLE_NAME)
.values(TIMESTAMP to timestamp)
.where("$PEER = ? AND $CALL_ID = ? AND $TIMESTAMP < ?", callLinkRecipient.id.serialize(), callId, timestamp)
.run()
false
} else if (!exists) {
db.insertInto(TABLE_NAME)
.values(
CALL_ID to callId,
MESSAGE_ID to null,
PEER to callLinkRecipient.id.toLong(),
EVENT to Event.serialize(Event.GENERIC_GROUP_CALL),
TYPE to Type.serialize(Type.AD_HOC_CALL),
DIRECTION to Direction.serialize(direction),
TIMESTAMP to timestamp,
RINGER to null
).run(SQLiteDatabase.CONFLICT_ABORT)
Log.d(TAG, "Inserted new call event for call link. Call Id: $callId")
true
} else false
}
Log.d(TAG, "Inserted new call event for call link. Call Id: $callId")
AppDependencies.databaseObserver.notifyCallUpdateObservers()
return didInsert
}
private fun insertCallEventFromGroupUpdate(
@@ -1648,7 +1683,7 @@ class CallTable(context: Context, databaseHelper: SignalDatabase) : DatabaseTabl
@JvmStatic
fun from(event: CallEvent.Event?): Event? {
return when (event) {
null, CallEvent.Event.UNKNOWN_ACTION -> null
null, CallEvent.Event.UNKNOWN_ACTION, CallEvent.Event.OBSERVED -> null
CallEvent.Event.ACCEPTED -> ACCEPTED
CallEvent.Event.NOT_ACCEPTED -> NOT_ACCEPTED
CallEvent.Event.DELETE -> DELETE

View File

@@ -40,7 +40,7 @@ class CallSyncEventJob private constructor(
recipientId = conversationRecipientId.toLong(),
callId = callId,
direction = CallTable.Direction.serialize(if (isIncoming) CallTable.Direction.INCOMING else CallTable.Direction.OUTGOING),
event = CallTable.Event.serialize(CallTable.Event.ACCEPTED)
callEvent = CallSyncEventJobRecord.Event.ACCEPTED
)
)
)
@@ -55,7 +55,22 @@ class CallSyncEventJob private constructor(
recipientId = conversationRecipientId.toLong(),
callId = callId,
direction = CallTable.Direction.serialize(if (isIncoming) CallTable.Direction.INCOMING else CallTable.Direction.OUTGOING),
event = CallTable.Event.serialize(CallTable.Event.NOT_ACCEPTED)
callEvent = CallSyncEventJobRecord.Event.NOT_ACCEPTED
)
)
)
}
@JvmStatic
fun createForObserved(conversationRecipientId: RecipientId, callId: Long): CallSyncEventJob {
return CallSyncEventJob(
getParameters(),
listOf(
CallSyncEventJobRecord(
recipientId = conversationRecipientId.toLong(),
callId = callId,
direction = CallTable.Direction.serialize(CallTable.Direction.INCOMING),
callEvent = CallSyncEventJobRecord.Event.OBSERVED
)
)
)
@@ -69,7 +84,7 @@ class CallSyncEventJob private constructor(
recipientId = it.peer.toLong(),
callId = it.callId,
direction = CallTable.Direction.serialize(it.direction),
event = CallTable.Event.serialize(CallTable.Event.DELETE)
callEvent = CallSyncEventJobRecord.Event.DELETE
)
}
)
@@ -136,29 +151,36 @@ class CallSyncEventJob private constructor(
}
private fun createSyncMessage(syncTimestamp: Long, callSyncEvent: CallSyncEventJobRecord, callType: CallTable.Type): SyncMessage.CallEvent {
return when (callSyncEvent.deserializeEvent()) {
CallTable.Event.ACCEPTED -> CallEventSyncMessageUtil.createAcceptedSyncMessage(
return when (callSyncEvent.resolveEvent()) {
CallSyncEventJobRecord.Event.ACCEPTED -> CallEventSyncMessageUtil.createAcceptedSyncMessage(
remotePeer = RemotePeer(callSyncEvent.deserializeRecipientId(), CallId(callSyncEvent.callId)),
timestamp = syncTimestamp,
isOutgoing = callSyncEvent.deserializeDirection() == CallTable.Direction.OUTGOING,
isVideoCall = callType != CallTable.Type.AUDIO_CALL
)
CallTable.Event.NOT_ACCEPTED -> CallEventSyncMessageUtil.createNotAcceptedSyncMessage(
CallSyncEventJobRecord.Event.NOT_ACCEPTED -> CallEventSyncMessageUtil.createNotAcceptedSyncMessage(
remotePeer = RemotePeer(callSyncEvent.deserializeRecipientId(), CallId(callSyncEvent.callId)),
timestamp = syncTimestamp,
isOutgoing = callSyncEvent.deserializeDirection() == CallTable.Direction.OUTGOING,
isVideoCall = callType != CallTable.Type.AUDIO_CALL
)
CallTable.Event.DELETE -> CallEventSyncMessageUtil.createDeleteCallEvent(
CallSyncEventJobRecord.Event.DELETE -> CallEventSyncMessageUtil.createDeleteCallEvent(
remotePeer = RemotePeer(callSyncEvent.deserializeRecipientId(), CallId(callSyncEvent.callId)),
timestamp = syncTimestamp,
isOutgoing = callSyncEvent.deserializeDirection() == CallTable.Direction.OUTGOING,
isVideoCall = callType != CallTable.Type.AUDIO_CALL
)
else -> throw Exception("Unsupported event: ${callSyncEvent.event}")
CallSyncEventJobRecord.Event.OBSERVED -> CallEventSyncMessageUtil.createObservedCallEvent(
remotePeer = RemotePeer(callSyncEvent.deserializeRecipientId(), CallId(callSyncEvent.callId)),
timestamp = syncTimestamp,
isOutgoing = false,
isVideoCall = callType != CallTable.Type.AUDIO_CALL
)
else -> throw Exception("Unsupported event: ${callSyncEvent.deprecatedEvent}")
}
}
@@ -166,7 +188,18 @@ class CallSyncEventJob private constructor(
private fun CallSyncEventJobRecord.deserializeDirection(): CallTable.Direction = CallTable.Direction.deserialize(direction)
private fun CallSyncEventJobRecord.deserializeEvent(): CallTable.Event = CallTable.Event.deserialize(event)
private fun CallSyncEventJobRecord.resolveEvent(): CallSyncEventJobRecord.Event {
return if (callEvent != CallSyncEventJobRecord.Event.UNKNOWN_ACTION) {
callEvent
} else {
when (CallTable.Event.deserialize(deprecatedEvent)) {
CallTable.Event.ACCEPTED -> CallSyncEventJobRecord.Event.ACCEPTED
CallTable.Event.NOT_ACCEPTED -> CallSyncEventJobRecord.Event.NOT_ACCEPTED
CallTable.Event.DELETE -> CallSyncEventJobRecord.Event.DELETE
else -> CallSyncEventJobRecord.Event.UNKNOWN_ACTION
}
}
}
class Factory : Job.Factory<CallSyncEventJob> {
override fun create(parameters: Parameters, serializedData: ByteArray?): CallSyncEventJob {

View File

@@ -1380,6 +1380,23 @@ object SyncMessageProcessor {
val event: CallTable.Event? = CallTable.Event.from(callEvent.event)
val hasConversationId: Boolean = callEvent.conversationId != null
if (hasConversationId && type == CallTable.Type.AD_HOC_CALL && callEvent.event == SyncMessage.CallEvent.Event.OBSERVED && direction != null) {
log(envelopeTimestamp, "Handling OBSERVED ad-hoc calling event")
if (direction == CallTable.Direction.OUTGOING) {
warn("Received an OBSERVED sync message for an outgoing event. Dropping.")
return
}
val recipient = resolveCallLinkRecipient(callEvent)
SignalDatabase.calls.insertOrUpdateAdHocCallFromObserveEvent(
callRecipient = recipient,
timestamp = callEvent.timestamp!!,
callId = callId
)
return
}
if (timestamp == 0L || type == null || direction == null || event == null || !hasConversationId) {
warn(envelopeTimestamp, "Group/Ad-hoc call event sync message is not valid, ignoring. timestamp: $timestamp type: $type direction: $direction event: $event hasPeer: $hasConversationId")
return
@@ -1387,9 +1404,7 @@ object SyncMessageProcessor {
val recipient: Recipient? = when (type) {
CallTable.Type.AD_HOC_CALL -> {
val callLinkRoomId = CallLinkRoomId.fromBytes(callEvent.conversationId!!.toByteArray())
val callLink = SignalDatabase.callLinks.getOrCreateCallLinkByRoomId(callLinkRoomId)
Recipient.resolved(callLink.recipientId)
resolveCallLinkRecipient(callEvent)
}
CallTable.Type.GROUP_CALL -> {
val groupId: GroupId = GroupId.push(callEvent.conversationId!!.toByteArray())
@@ -1453,6 +1468,12 @@ object SyncMessageProcessor {
}
}
private fun resolveCallLinkRecipient(callEvent: SyncMessage.CallEvent): Recipient {
val callLinkRoomId = CallLinkRoomId.fromBytes(callEvent.conversationId!!.toByteArray())
val callLink = SignalDatabase.callLinks.getOrCreateCallLinkByRoomId(callLinkRoomId)
return Recipient.resolved(callLink.recipientId)
}
private fun handleSynchronizeDeleteForMe(context: Context, deleteForMe: SyncMessage.DeleteForMe, envelopeTimestamp: Long, earlyMessageCacheEntry: EarlyMessageCacheEntry?) {
if (!FeatureFlags.deleteSyncEnabled()) {
warn(envelopeTimestamp, "Delete for me sync message dropped as support not enabled")

View File

@@ -47,6 +47,18 @@ object CallEventSyncMessageUtil {
)
}
@JvmStatic
fun createObservedCallEvent(remotePeer: RemotePeer, timestamp: Long, isOutgoing: Boolean, isVideoCall: Boolean): CallEvent {
return createCallEvent(
remotePeer.id,
remotePeer.callId.longValue(),
timestamp,
isOutgoing,
isVideoCall,
CallEvent.Event.OBSERVED
)
}
private fun createCallEvent(
recipientId: RecipientId,
callId: Long,

View File

@@ -424,6 +424,14 @@ public final class SignalCallManager implements CallManager.Observer, GroupCall.
return;
}
String eraId = info.getEraId();
if (eraId != null && !info.getJoinedMembers().isEmpty()) {
if (SignalDatabase.calls().insertAdHocCallFromObserveEvent(callLinkRecipient, System.currentTimeMillis(), eraId)) {
AppDependencies.getJobManager()
.add(CallSyncEventJob.createForObserved(callLinkRecipient.getId(), CallId.fromEra(eraId).longValue()));
}
}
linkPeekInfoStore.update(store -> {
Map<RecipientId, CallLinkPeekInfo> newHashMap = new HashMap<>(store);
newHashMap.put(id, CallLinkPeekInfo.fromPeekInfo(info));