Update LibSignalChatConnection to use new ChatConnection API rather than ChatService

This commit is contained in:
andrew-signal
2025-02-04 14:34:07 -05:00
committed by Greyson Parrelli
parent fe44789d88
commit 2186e2bf92
5 changed files with 218 additions and 253 deletions

View File

@@ -12,14 +12,16 @@ import io.reactivex.rxjava3.subjects.BehaviorSubject
import io.reactivex.rxjava3.subjects.SingleSubject
import okio.ByteString
import okio.ByteString.Companion.toByteString
import okio.withLock
import org.signal.core.util.logging.Log
import org.signal.libsignal.net.AuthenticatedChatService
import org.signal.libsignal.net.ChatListener
import org.signal.libsignal.net.ChatService
import org.signal.libsignal.internal.CompletableFuture
import org.signal.libsignal.net.AuthenticatedChatConnection
import org.signal.libsignal.net.ChatConnection
import org.signal.libsignal.net.ChatConnectionListener
import org.signal.libsignal.net.ChatServiceException
import org.signal.libsignal.net.DeviceDeregisteredException
import org.signal.libsignal.net.Network
import org.signal.libsignal.net.UnauthenticatedChatService
import org.signal.libsignal.net.UnauthenticatedChatConnection
import org.whispersystems.signalservice.api.util.CredentialsProvider
import org.whispersystems.signalservice.api.websocket.HealthMonitor
import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState
@@ -35,21 +37,20 @@ import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import kotlin.time.Duration.Companion.seconds
import org.signal.libsignal.net.ChatService.Request as LibSignalRequest
import org.signal.libsignal.net.ChatService.Response as LibSignalResponse
import org.signal.libsignal.net.ChatConnection.Request as LibSignalRequest
import org.signal.libsignal.net.ChatConnection.Response as LibSignalResponse
/**
* Implements the WebSocketConnection interface via libsignal-net
*
* Notable implementation choices:
* - [chatService] contains both the authenticated and unauthenticated connections,
* which one to use for [sendRequest]/[sendResponse] is based on [isAuthenticated].
* - keep-alive requests always use the [org.signal.libsignal.net.ChatService.unauthenticatedSendAndDebug]
* API, and log the debug info on success.
* - regular sends use [org.signal.libsignal.net.ChatService.unauthenticatedSend] and don't create any overhead.
* - [chatConnection] contains either an authenticated or an unauthenticated connections
* - keep-alive requests are sent on both authenticated and unauthenticated connections, mirroring the existing OkHttp behavior
* - [org.whispersystems.signalservice.api.websocket.WebSocketConnectionState] reporting is implemented
* as close as possible to the original implementation in
* [org.whispersystems.signalservice.internal.websocket.OkHttpWebSocketConnection].
* - we expose fake "psuedo IDs" for incoming requests so the layer on top of ours can work with IDs, just
* like with the old OkHttp implementation, and internally we map these IDs to AckSenders
*/
class LibSignalChatConnection(
name: String,
@@ -73,10 +74,10 @@ class LibSignalChatConnection(
// tell us to send a response for that ID, and then we use the pseudo ID as a handle to find
// the callback given to us earlier by libsignal-net, and we call that callback.
private val nextIncomingMessageInternalPseudoId = AtomicLong(1)
val ackSenderForInternalPseudoId = ConcurrentHashMap<Long, ChatListener.ServerMessageAck>()
val ackSenderForInternalPseudoId = ConcurrentHashMap<Long, ChatConnectionListener.ServerMessageAck>()
private val CHAT_SERVICE_LOCK = ReentrantLock()
private var chatService: ChatService? = null
private var chatConnection: ChatConnection? = null
companion object {
const val SERVICE_ENVELOPE_REQUEST_VERB = "PUT"
@@ -142,23 +143,38 @@ class LibSignalChatConnection(
// There's no sense in resetting nextIncomingMessageInternalPseudoId.
}
init {
if (credentialsProvider != null) {
check(!credentialsProvider.username.isNullOrEmpty())
check(!credentialsProvider.password.isNullOrEmpty())
}
}
override fun connect(): Observable<WebSocketConnectionState> {
CHAT_SERVICE_LOCK.withLock {
if (chatService != null) {
if (!isDead()) {
return state
}
Log.i(TAG, "$name Connecting...")
chatService = network.createChatService(credentialsProvider, receiveStories, listener).apply {
state.onNext(WebSocketConnectionState.CONNECTING)
connect().whenComplete(
onSuccess = { debugInfo ->
val chatConnectionFuture: CompletableFuture<out ChatConnection> = if (credentialsProvider == null) {
network.connectUnauthChat(listener)
} else {
network.connectAuthChat(credentialsProvider.username, credentialsProvider.password, receiveStories, listener)
}
state.onNext(WebSocketConnectionState.CONNECTING)
chatConnectionFuture.whenComplete(
onSuccess = { connection ->
CHAT_SERVICE_LOCK.withLock {
chatConnection = connection
connection?.start()
Log.i(TAG, "$name Connected")
Log.d(TAG, "$name $debugInfo")
state.onNext(WebSocketConnectionState.CONNECTED)
},
onFailure = { throwable ->
}
},
onFailure = { throwable ->
CHAT_SERVICE_LOCK.withLock {
Log.w(TAG, "$name [connect] Failure:", throwable)
chatConnection = null
// Internally, libsignal-net will throw this DeviceDeregisteredException when the HTTP CONNECT
// request returns HTTP 403.
// The chat service currently does not return HTTP 401 on /v1/websocket.
@@ -169,27 +185,38 @@ class LibSignalChatConnection(
state.onNext(WebSocketConnectionState.FAILED)
}
}
)
}
}
)
return state
}
}
override fun isDead(): Boolean {
CHAT_SERVICE_LOCK.withLock {
return chatService == null
return when (state.value) {
WebSocketConnectionState.DISCONNECTED,
WebSocketConnectionState.DISCONNECTING,
WebSocketConnectionState.FAILED,
WebSocketConnectionState.AUTHENTICATION_FAILED -> true
WebSocketConnectionState.CONNECTING,
WebSocketConnectionState.CONNECTED,
WebSocketConnectionState.RECONNECTING -> false
null -> throw IllegalStateException("LibSignalChatConnection.state can never be null")
}
}
}
override fun disconnect() {
CHAT_SERVICE_LOCK.withLock {
if (chatService == null) {
if (isDead()) {
return
}
Log.i(TAG, "$name Disconnecting...")
state.onNext(WebSocketConnectionState.DISCONNECTING)
chatService!!.disconnect()
chatConnection!!.disconnect()
.whenComplete(
onSuccess = {
Log.i(TAG, "$name Disconnected")
@@ -200,18 +227,18 @@ class LibSignalChatConnection(
state.onNext(WebSocketConnectionState.DISCONNECTED)
}
)
chatService = null
chatConnection = null
}
}
override fun sendRequest(request: WebSocketRequestMessage): Single<WebsocketResponse> {
CHAT_SERVICE_LOCK.withLock {
if (chatService == null) {
if (isDead()) {
return Single.error(IOException("$name is closed!"))
}
val single = SingleSubject.create<WebsocketResponse>()
val internalRequest = request.toLibSignalRequest()
chatService!!.send(internalRequest)
chatConnection!!.send(internalRequest)
.whenComplete(
onSuccess = { response ->
Log.d(TAG, "$name [sendRequest] Success: ${response!!.status}")
@@ -219,13 +246,13 @@ class LibSignalChatConnection(
in 400..599 -> {
healthMonitor.onMessageError(
status = response.status,
isIdentifiedWebSocket = chatService is AuthenticatedChatService
isIdentifiedWebSocket = chatConnection is AuthenticatedChatConnection
)
}
}
// Here success means "we received the response" even if it is reporting an error.
// This is consistent with the behavior of the OkHttpWebSocketConnection.
single.onSuccess(response.toWebsocketResponse(isUnidentified = (chatService is UnauthenticatedChatService)))
single.onSuccess(response.toWebsocketResponse(isUnidentified = (chatConnection is UnauthenticatedChatConnection)))
},
onFailure = { throwable ->
Log.w(TAG, "$name [sendRequest] Failure:", throwable)
@@ -238,29 +265,29 @@ class LibSignalChatConnection(
override fun sendKeepAlive() {
CHAT_SERVICE_LOCK.withLock {
if (chatService == null) {
if (isDead()) {
return
}
Log.i(TAG, "$name Sending keep alive...")
chatService!!.sendAndDebug(KEEP_ALIVE_REQUEST)
chatConnection!!.send(KEEP_ALIVE_REQUEST)
.whenComplete(
onSuccess = { debugResponse ->
onSuccess = { response ->
Log.d(TAG, "$name [sendKeepAlive] Success")
when (debugResponse!!.response.status) {
when (response!!.status) {
in 200..299 -> {
healthMonitor.onKeepAliveResponse(
sentTimestamp = Instant.now().toEpochMilli(), // ignored. can be any value
isIdentifiedWebSocket = chatService is AuthenticatedChatService
isIdentifiedWebSocket = chatConnection is AuthenticatedChatConnection
)
}
in 400..599 -> {
healthMonitor.onMessageError(debugResponse.response.status, (chatService is AuthenticatedChatService))
healthMonitor.onMessageError(response.status, (chatConnection is AuthenticatedChatConnection))
}
else -> {
Log.w(TAG, "$name [sendKeepAlive] Unsupported keep alive response status: ${debugResponse.response.status}")
Log.w(TAG, "$name [sendKeepAlive] Unsupported keep alive response status: ${response.status}")
}
}
},
@@ -310,8 +337,8 @@ class LibSignalChatConnection(
private val listener = LibSignalChatListener()
private inner class LibSignalChatListener : ChatListener {
override fun onIncomingMessage(chat: ChatService, envelope: ByteArray, serverDeliveryTimestamp: Long, sendAck: ChatListener.ServerMessageAck?) {
private inner class LibSignalChatListener : ChatConnectionListener {
override fun onIncomingMessage(chat: ChatConnection, envelope: ByteArray, serverDeliveryTimestamp: Long, sendAck: ChatConnectionListener.ServerMessageAck?) {
// NB: The order here is intentional to ensure concurrency-safety, so that when a request is pulled off the queue, its sendAck is
// already in the ackSender map, if it exists.
val internalPseudoId = nextIncomingMessageInternalPseudoId.getAndIncrement()
@@ -328,15 +355,15 @@ class LibSignalChatConnection(
incomingRequestQueue.put(incomingWebSocketRequest)
}
override fun onConnectionInterrupted(chat: ChatService, disconnectReason: ChatServiceException) {
override fun onConnectionInterrupted(chat: ChatConnection, disconnectReason: ChatServiceException) {
CHAT_SERVICE_LOCK.withLock {
Log.i(TAG, "$name connection interrupted", disconnectReason)
chatService = null
chatConnection = null
state.onNext(WebSocketConnectionState.DISCONNECTED)
}
}
override fun onQueueEmpty(chat: ChatService) {
override fun onQueueEmpty(chat: ChatConnection) {
val internalPseudoId = nextIncomingMessageInternalPseudoId.getAndIncrement()
val queueEmptyRequest = WebSocketRequestMessage(
verb = SOCKET_EMPTY_REQUEST_VERB,

View File

@@ -7,29 +7,9 @@
package org.whispersystems.signalservice.internal.websocket
import org.signal.core.util.orNull
import org.signal.libsignal.net.ChatListener
import org.signal.libsignal.net.ChatService
import org.signal.libsignal.net.Network
import org.whispersystems.signalservice.api.util.CredentialsProvider
import org.whispersystems.signalservice.internal.configuration.SignalServiceConfiguration
/**
* Helper method to create a ChatService with optional credentials.
*/
fun Network.createChatService(
credentialsProvider: CredentialsProvider? = null,
receiveStories: Boolean,
listener: ChatListener? = null
): ChatService {
val username = credentialsProvider?.username ?: ""
val password = credentialsProvider?.password ?: ""
return if (username.isEmpty() && password.isEmpty()) {
this.createUnauthChatService(listener)
} else {
this.createAuthChatService(username, password, receiveStories, listener)
}
}
/**
* Helper method to apply settings from the SignalServiceConfiguration.
*/

View File

@@ -10,7 +10,9 @@ import io.reactivex.rxjava3.core.Single
import okhttp3.Response
import okhttp3.WebSocket
import org.signal.core.util.logging.Log
import org.signal.libsignal.net.ChatService
import org.signal.libsignal.net.ChatConnection
import org.signal.libsignal.net.Network
import org.signal.libsignal.net.UnauthenticatedChatConnection
import org.whispersystems.signalservice.api.util.CredentialsProvider
import org.whispersystems.signalservice.api.websocket.HealthMonitor
import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState
@@ -46,7 +48,7 @@ class ShadowingWebSocketConnection(
signalAgent: String,
healthMonitor: HealthMonitor,
allowStories: Boolean,
private val chatService: ChatService,
private val network: Network,
private val shadowPercentage: Int,
private val bridge: WebSocketShadowingBridge
) : OkHttpWebSocketConnection(
@@ -67,19 +69,33 @@ class ShadowingWebSocketConnection(
}
private val canShadow: AtomicBoolean = AtomicBoolean(false)
private val executor: ExecutorService = Executors.newSingleThreadExecutor()
private var chatConnection: UnauthenticatedChatConnection? = null
private var shadowingConnectPending = false
override fun connect(): Observable<WebSocketConnectionState> {
executor.submit {
chatService.connect().whenComplete(
onSuccess = {
canShadow.set(true)
Log.i(TAG, "Shadow socket connected.")
},
onFailure = {
canShadow.set(false)
Log.i(TAG, "Shadow socket failed to connect.")
}
)
// NB: The potential for race conditions here was introduced when we switched from ChatService's
// long lived connection model to the single-use ChatConnection model.
// At this time, we do not intend to ever use this code in production again, so I'm deferring properly
// fixing it with a refactor, and instead just doing the bare minimum to avoid an obvious race.
// If we do want to use this again in production, we should probably refactor to depend on the higher level
// LibSignalChatConnection, rather than the lower level ChatConnection API.
if (chatConnection == null && !shadowingConnectPending) {
shadowingConnectPending = true
executor.submit {
network.connectUnauthChat(null).whenComplete(
onSuccess = { connection ->
shadowingConnectPending = false
chatConnection = connection
canShadow.set(true)
Log.i(TAG, "Shadow socket connected.")
},
onFailure = {
shadowingConnectPending = false
canShadow.set(false)
Log.i(TAG, "Shadow socket failed to connect.")
}
)
}
}
return super.connect()
}
@@ -96,7 +112,7 @@ class ShadowingWebSocketConnection(
override fun disconnect() {
executor.submit {
chatService.disconnect().thenApply {
chatConnection?.disconnect()?.thenApply {
canShadow.set(false)
Log.i(TAG, "Shadow socket disconnected.")
}
@@ -133,22 +149,23 @@ class ShadowingWebSocketConnection(
}
private fun libsignalKeepAlive(actualResponse: WebsocketResponse) {
val request = ChatService.Request(
val connection = chatConnection ?: return
val request = ChatConnection.Request(
"GET",
"/v1/keepalive",
emptyMap(),
ByteArray(0),
KEEP_ALIVE_TIMEOUT.inWholeMilliseconds.toInt()
)
chatService.sendAndDebug(request)
.whenComplete(
onSuccess = {
connection.send(request)
?.whenComplete(
onSuccess = { response ->
stats.requestsCompared.incrementAndGet()
val goodStatus = (it?.response?.status ?: -1) in 200..299
val goodStatus = (response?.status ?: -1) in 200..299
if (!goodStatus) {
stats.badStatuses.incrementAndGet()
}
Log.i(TAG, "$it")
Log.i(TAG, response?.message)
},
onFailure = {
stats.requestsCompared.incrementAndGet()