Implement a libsignal-net shadowing web socket.

This commit is contained in:
moiseev-signal
2024-05-06 15:20:11 -07:00
committed by Alex Hart
parent 78bbab37fb
commit 9a0bb243cd
9 changed files with 427 additions and 42 deletions

View File

@@ -45,6 +45,7 @@ import org.thoughtcrime.securesms.jobs.TypingSendJob;
import org.thoughtcrime.securesms.keyvalue.SignalStore;
import org.thoughtcrime.securesms.megaphone.MegaphoneRepository;
import org.thoughtcrime.securesms.messages.IncomingMessageObserver;
import org.thoughtcrime.securesms.net.DefaultWebSocketShadowingBridge;
import org.thoughtcrime.securesms.net.SignalWebSocketHealthMonitor;
import org.thoughtcrime.securesms.net.StandardUserAgentInterceptor;
import org.thoughtcrime.securesms.notifications.MessageNotifier;
@@ -92,9 +93,11 @@ import org.whispersystems.signalservice.api.util.UptimeSleepTimer;
import org.whispersystems.signalservice.api.websocket.WebSocketFactory;
import org.whispersystems.signalservice.internal.configuration.SignalServiceConfiguration;
import org.whispersystems.signalservice.internal.websocket.LibSignalNetwork;
import org.whispersystems.signalservice.internal.websocket.ShadowingWebSocketConnection;
import org.whispersystems.signalservice.internal.websocket.WebSocketConnection;
import org.whispersystems.signalservice.internal.websocket.LibSignalChatConnection;
import org.whispersystems.signalservice.internal.websocket.OkHttpWebSocketConnection;
import org.whispersystems.signalservice.internal.websocket.WebSocketShadowingBridge;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@@ -294,7 +297,8 @@ public class ApplicationDependencyProvider implements ApplicationDependencies.Pr
public @NonNull SignalWebSocket provideSignalWebSocket(@NonNull Supplier<SignalServiceConfiguration> signalServiceConfigurationSupplier, @NonNull Supplier<LibSignalNetwork> libSignalNetworkSupplier) {
SleepTimer sleepTimer = !SignalStore.account().isFcmEnabled() || SignalStore.internalValues().isWebsocketModeForced() ? new AlarmSleepTimer(context) : new UptimeSleepTimer() ;
SignalWebSocketHealthMonitor healthMonitor = new SignalWebSocketHealthMonitor(context, sleepTimer);
SignalWebSocket signalWebSocket = new SignalWebSocket(provideWebSocketFactory(signalServiceConfigurationSupplier, healthMonitor, libSignalNetworkSupplier));
WebSocketShadowingBridge bridge = new DefaultWebSocketShadowingBridge(context);
SignalWebSocket signalWebSocket = new SignalWebSocket(provideWebSocketFactory(signalServiceConfigurationSupplier, healthMonitor, libSignalNetworkSupplier, bridge));
healthMonitor.monitor(signalWebSocket);
@@ -401,7 +405,11 @@ public class ApplicationDependencyProvider implements ApplicationDependencies.Pr
return provideClientZkOperations(signalServiceConfiguration).getReceiptOperations();
}
@NonNull WebSocketFactory provideWebSocketFactory(@NonNull Supplier<SignalServiceConfiguration> signalServiceConfigurationSupplier, @NonNull SignalWebSocketHealthMonitor healthMonitor, @NonNull Supplier<LibSignalNetwork> libSignalNetworkSupplier) {
@NonNull WebSocketFactory provideWebSocketFactory(@NonNull Supplier<SignalServiceConfiguration> signalServiceConfigurationSupplier,
@NonNull SignalWebSocketHealthMonitor healthMonitor,
@NonNull Supplier<LibSignalNetwork> libSignalNetworkSupplier,
@NonNull WebSocketShadowingBridge bridge)
{
return new WebSocketFactory() {
@Override
public WebSocketConnection createWebSocket() {
@@ -415,6 +423,20 @@ public class ApplicationDependencyProvider implements ApplicationDependencies.Pr
@Override
public WebSocketConnection createUnidentifiedWebSocket() {
int shadowPercentage = FeatureFlags.libSignalWebSocketShadowingPercentage();
if (shadowPercentage > 0) {
return new ShadowingWebSocketConnection(
"unauth-shadow",
signalServiceConfigurationSupplier.get(),
Optional.empty(),
BuildConfig.SIGNAL_AGENT,
healthMonitor,
Stories.isFeatureEnabled(),
libSignalNetworkSupplier.get().createChatService(null),
shadowPercentage,
bridge
);
}
if (FeatureFlags.libSignalWebSocketEnabled()) {
LibSignalNetwork network = libSignalNetworkSupplier.get();
return new LibSignalChatConnection(

View File

@@ -32,6 +32,7 @@ public final class InternalValues extends SignalStoreValues {
public static final String LAST_SCROLL_POSITION = "internal.last_scroll_position";
public static final String CONVERSATION_ITEM_V2_MEDIA = "internal.conversation_item_v2_media";
public static final String FORCE_ENTER_RESTORE_V2_FLOW = "internal.force_enter_restore_v2_flow";
public static final String WEB_SOCKET_SHADOWING_STATS = "internal.web_socket_shadowing_stats";
InternalValues(KeyValueStore store) {
super(store);
@@ -219,4 +220,13 @@ public final class InternalValues extends SignalStoreValues {
public boolean enterRestoreV2Flow() {
return FeatureFlags.restoreAfterRegistration() && getBoolean(FORCE_ENTER_RESTORE_V2_FLOW, false);
}
public synchronized void setWebSocketShadowingStats(byte[] bytes) {
putBlob(WEB_SOCKET_SHADOWING_STATS, bytes);
}
public synchronized byte[] getWebSocketShadowingStats(byte[] defaultValue) {
return getBlob(WEB_SOCKET_SHADOWING_STATS, defaultValue);
}
}

View File

@@ -0,0 +1,59 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.net
import android.app.Application
import android.app.Notification
import android.app.PendingIntent
import android.content.Intent
import androidx.core.app.NotificationCompat
import androidx.core.app.NotificationManagerCompat
import org.signal.core.util.PendingIntentFlags
import org.thoughtcrime.securesms.R
import org.thoughtcrime.securesms.keyvalue.InternalValues
import org.thoughtcrime.securesms.keyvalue.SignalStore
import org.thoughtcrime.securesms.logsubmit.SubmitDebugLogActivity
import org.thoughtcrime.securesms.notifications.NotificationChannels
import org.thoughtcrime.securesms.notifications.NotificationIds
import org.thoughtcrime.securesms.util.FeatureFlags
import org.whispersystems.signalservice.internal.websocket.WebSocketShadowingBridge
/**
* Implements a [WebSocketShadowingBridge] to provide shadowing-specific functionality to
* [org.whispersystems.signalservice.internal.websocket.ShadowingWebSocketConnection]
*/
class DefaultWebSocketShadowingBridge(private val context: Application) : WebSocketShadowingBridge {
private val store: InternalValues = SignalStore.internalValues()
override fun writeStatsSnapshot(bytes: ByteArray) {
store.setWebSocketShadowingStats(bytes)
}
override fun readStatsSnapshot(): ByteArray? {
return store.getWebSocketShadowingStats(null)
}
override fun triggerFailureNotification(message: String) {
if (!FeatureFlags.internalUser()) {
return
}
val notification: Notification = NotificationCompat.Builder(context, NotificationChannels.getInstance().FAILURES)
.setSmallIcon(R.drawable.ic_notification)
.setContentTitle("[Internal-only] $message")
.setContentText("Tap to send a debug log")
.setContentIntent(
PendingIntent.getActivity(
context,
0,
Intent(context, SubmitDebugLogActivity::class.java),
PendingIntentFlags.mutable()
)
)
.build()
NotificationManagerCompat.from(context).notify(NotificationIds.INTERNAL_ERROR, notification)
}
}

View File

@@ -130,6 +130,7 @@ public final class FeatureFlags {
private static final String REGISTRATION_V2 = "android.registration.v2";
private static final String LIBSIGNAL_WEB_SOCKET_ENABLED = "android.libsignalWebSocketEnabled";
private static final String RESTORE_POST_REGISTRATION = "android.registration.restorePostRegistration";
private static final String LIBSIGNAL_WEB_SOCKET_SHADOW_PCT = "android.libsignalWebSocketShadowingPercentage";
/**
* We will only store remote values for flags in this set. If you want a flag to be controllable
@@ -209,7 +210,8 @@ public final class FeatureFlags {
RX_MESSAGE_SEND,
LINKED_DEVICE_LIFESPAN_SECONDS,
CAMERAX_CUSTOM_CONTROLLER,
LIBSIGNAL_WEB_SOCKET_ENABLED
LIBSIGNAL_WEB_SOCKET_ENABLED,
LIBSIGNAL_WEB_SOCKET_SHADOW_PCT
);
@VisibleForTesting
@@ -755,6 +757,15 @@ public final class FeatureFlags {
return getBoolean(RESTORE_POST_REGISTRATION, false);
}
/**
* Percentage [0, 100] of web socket requests that will be "shadowed" by sending
* an unauthenticated keep-alive via libsignal-net. Default: 0
*/
public static int libSignalWebSocketShadowingPercentage() {
int value = getInteger(LIBSIGNAL_WEB_SOCKET_SHADOW_PCT, 0);
return Math.max(0, Math.min(value, 100));
}
/** Only for rendering debug info. */
public static synchronized @NonNull Map<String, Object> getMemoryValues() {
return new TreeMap<>(REMOTE_VALUES);

View File

@@ -0,0 +1,25 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.signalservice.internal.util
import org.signal.libsignal.internal.CompletableFuture
/**
* A Kotlin friendly adapter for [org.signal.libsignal.internal.CompletableFuture.whenComplete]
* taking two callbacks ([onSuccess] and [onFailure]) instead of a [java.util.function.BiConsumer].
*/
fun <T> CompletableFuture<T>.whenComplete(
onSuccess: ((T?) -> Unit),
onFailure: ((Throwable) -> Unit)
): CompletableFuture<T> {
return this.whenComplete { value, throwable ->
if (throwable != null) {
onFailure(throwable)
} else {
onSuccess(value)
}
}
}

View File

@@ -11,10 +11,10 @@ import io.reactivex.rxjava3.schedulers.Schedulers
import io.reactivex.rxjava3.subjects.BehaviorSubject
import io.reactivex.rxjava3.subjects.SingleSubject
import org.signal.core.util.logging.Log
import org.signal.libsignal.internal.CompletableFuture
import org.signal.libsignal.net.ChatService
import org.whispersystems.signalservice.api.websocket.HealthMonitor
import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState
import org.whispersystems.signalservice.internal.util.whenComplete
import java.time.Instant
import java.util.Optional
import kotlin.time.Duration.Companion.seconds
@@ -52,6 +52,31 @@ class LibSignalChatConnection(
ByteArray(0),
SEND_TIMEOUT.toInt()
)
private fun WebSocketRequestMessage.toLibSignalRequest(timeout: Long = SEND_TIMEOUT): LibSignalRequest {
return LibSignalRequest(
this.verb?.uppercase() ?: "GET",
this.path ?: "",
this.headers.associate {
val parts = it.split(':', limit = 2)
if (parts.size != 2) {
throw IllegalArgumentException("Headers must contain at least one colon")
}
parts[0] to parts[1]
},
this.body?.toByteArray() ?: byteArrayOf(),
timeout.toInt()
)
}
private fun LibSignalResponse.toWebsocketResponse(isUnidentified: Boolean): WebsocketResponse {
return WebsocketResponse(
this.status,
this.body.decodeToString(),
this.headers,
isUnidentified
)
}
}
override val name = "[$name:${System.identityHashCode(this)}]"
@@ -176,42 +201,4 @@ class LibSignalChatConnection(
override fun sendResponse(response: WebSocketResponseMessage?) {
throw NotImplementedError()
}
private fun WebSocketRequestMessage.toLibSignalRequest(timeout: Long = SEND_TIMEOUT): LibSignalRequest {
return LibSignalRequest(
this.verb?.uppercase() ?: "GET",
this.path ?: "",
this.headers.associate {
val parts = it.split(':', limit = 2)
if (parts.size != 2) {
throw IllegalArgumentException("Headers must contain at least one colon")
}
parts[0] to parts[1]
},
this.body?.toByteArray() ?: byteArrayOf(),
timeout.toInt()
)
}
private fun LibSignalResponse.toWebsocketResponse(isUnidentified: Boolean): WebsocketResponse {
return WebsocketResponse(
this.status,
this.body.decodeToString(),
this.headers,
isUnidentified
)
}
private fun <T> CompletableFuture<T>.whenComplete(
onSuccess: ((T?) -> Unit),
onFailure: ((Throwable) -> Unit)
): CompletableFuture<T> {
return this.whenComplete { value, throwable ->
if (throwable != null) {
onFailure(throwable)
} else {
onSuccess(value)
}
}
}
}

View File

@@ -0,0 +1,227 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.signalservice.internal.websocket
import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.core.Single
import okhttp3.Response
import okhttp3.WebSocket
import org.signal.core.util.logging.Log
import org.signal.libsignal.net.ChatService
import org.whispersystems.signalservice.api.util.CredentialsProvider
import org.whispersystems.signalservice.api.websocket.HealthMonitor
import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState
import org.whispersystems.signalservice.internal.configuration.SignalServiceConfiguration
import org.whispersystems.signalservice.internal.util.whenComplete
import java.util.Optional
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong
import kotlin.random.Random
import kotlin.time.Duration
import kotlin.time.Duration.Companion.days
import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds
/**
* A wrapper on top of [OkHttpWebSocketConnection] that sends a keep alive via
* libsignal-net for a configurable percentage of the _successful_ web socket requests.
*
* Stats are collected for the shadowed requests and persisted across app restarts
* using [org.thoughtcrime.securesms.keyvalue.InternalValues].
*
* When a hardcoded error threshold is reached, the user is notified to submit debug logs.
*
* @see [org.thoughtcrime.securesms.util.FeatureFlags.libSignalWebSocketShadowingPercentage]
*/
class ShadowingWebSocketConnection(
name: String,
serviceConfiguration: SignalServiceConfiguration,
credentialsProvider: Optional<CredentialsProvider>,
signalAgent: String,
healthMonitor: HealthMonitor,
allowStories: Boolean,
private val chatService: ChatService,
private val shadowPercentage: Int,
private val bridge: WebSocketShadowingBridge
) : OkHttpWebSocketConnection(
name,
serviceConfiguration,
credentialsProvider,
signalAgent,
healthMonitor,
allowStories
) {
private var stats: Stats = try {
bridge.readStatsSnapshot()?.let {
Stats.fromSnapshot(it)
} ?: Stats()
} catch (ex: Exception) {
Log.w(TAG, "Failed to restore Stats from a snapshot.", ex)
Stats()
}
private val canShadow: AtomicBoolean = AtomicBoolean(false)
private val executor: ExecutorService = Executors.newSingleThreadExecutor()
override fun connect(): Observable<WebSocketConnectionState> {
executor.submit {
chatService.connectUnauthenticated().whenComplete(
onSuccess = {
canShadow.set(true)
Log.i(TAG, "Shadow socket connected.")
},
onFailure = {
canShadow.set(false)
Log.i(TAG, "Shadow socket failed to connect.")
}
)
}
return super.connect()
}
override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
saveSnapshot()
super.onClosing(webSocket, code, reason)
}
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
saveSnapshot()
super.onFailure(webSocket, t, response)
}
override fun disconnect() {
executor.submit {
chatService.disconnect().thenApply {
canShadow.set(false)
Log.i(TAG, "Shadow socket disconnected.")
}
}
super.disconnect()
}
override fun sendRequest(request: WebSocketRequestMessage): Single<WebsocketResponse> {
return super.sendRequest(request).doOnSuccess(::sendShadow)
}
private fun sendShadow(actualResponse: WebsocketResponse) {
executor.submit {
if (canShadow.get() && Random.nextInt(100) < this.shadowPercentage) {
libsignalKeepAlive(actualResponse)
val snapshotAge = System.currentTimeMillis() - stats.lastSnapshot.get()
if (snapshotAge > SNAPSHOT_INTERVAL.inWholeMilliseconds) {
saveSnapshot()
}
}
}
}
private fun shouldSubmitLogs(): Boolean {
val requestsCompared = stats.requestsCompared.get()
// Should not happen in practice, but helps avoid a division by zero later if it does.
if (requestsCompared == 0) {
return false
}
val timeSinceLastNotificationMs = System.currentTimeMillis() - stats.lastNotified.get()
val percentFailed = stats.failures.get() * 100 / requestsCompared
return timeSinceLastNotificationMs > FULL_DAY.inWholeMilliseconds &&
percentFailed > FAILURE_PERCENTAGE
}
private fun libsignalKeepAlive(actualResponse: WebsocketResponse) {
val request = ChatService.Request(
"GET",
"/v1/keepalive",
emptyMap(),
ByteArray(0),
KEEP_ALIVE_TIMEOUT.inWholeMilliseconds.toInt()
)
chatService.unauthenticatedSendAndDebug(request)
.whenComplete(
onSuccess = {
stats.requestsCompared.incrementAndGet()
val goodStatus = (it?.response?.status ?: -1) in 200..299
if (!goodStatus) {
stats.badStatuses.incrementAndGet()
}
stats.reconnects.addAndGet(it?.debugInfo?.reconnectCount ?: 0)
Log.i(TAG, "$it")
},
onFailure = {
stats.requestsCompared.incrementAndGet()
stats.failures.incrementAndGet()
Log.w(TAG, "Shadow request failed: reason=[$it]")
Log.i(TAG, "Actual response status=${actualResponse.status}")
if (shouldSubmitLogs()) {
Log.i(TAG, "Notification to submit logs triggered.")
bridge.triggerFailureNotification("Experimental websocket transport failures!")
stats.reset()
}
}
)
}
private fun saveSnapshot() {
executor.submit {
Log.d(TAG, "Persisting shadowing stats snapshot.")
bridge.writeStatsSnapshot(stats.snapshot())
}
}
companion object {
private val TAG: String = Log.tag(ShadowingWebSocketConnection::class.java)
private val FULL_DAY: Duration = 1.days
// If more than this percentage of shadow requests fail, the
// notification to submit logs will be triggered.
private const val FAILURE_PERCENTAGE: Int = 10
private val KEEP_ALIVE_TIMEOUT: Duration = 3.seconds
private val SNAPSHOT_INTERVAL: Duration = 10.minutes
}
class Stats(
requestsCompared: Int = 0,
failures: Int = 0,
badStatuses: Int = 0,
reconnects: Int = 0,
lastNotified: Long = 0
) {
val requestsCompared: AtomicInteger = AtomicInteger(requestsCompared)
val failures: AtomicInteger = AtomicInteger(failures)
val badStatuses: AtomicInteger = AtomicInteger(badStatuses)
val reconnects: AtomicInteger = AtomicInteger(reconnects)
val lastNotified: AtomicLong = AtomicLong(lastNotified)
val lastSnapshot: AtomicLong = AtomicLong(0)
fun reset() {
requestsCompared.set(0)
failures.set(0)
badStatuses.set(0)
reconnects.set(0)
// Do not reset lastNotified nor lastSnapshot
}
companion object {
fun fromSnapshot(bytes: ByteArray): Stats {
val snapshot = Snapshot.ADAPTER.decode(bytes)
return Stats(snapshot.requestsCompared, snapshot.failures, snapshot.badStatuses, snapshot.reconnects, snapshot.lastNotified)
}
}
fun snapshot(): ByteArray {
lastSnapshot.set(System.currentTimeMillis())
return Snapshot.Builder()
.requestsCompared(requestsCompared.get())
.failures(failures.get())
.badStatuses(badStatuses.get())
.reconnects(reconnects.get())
.lastNotified(lastNotified.get())
.build()
.encode()
}
}
}

View File

@@ -0,0 +1,26 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.signalservice.internal.websocket
/**
* An interface to support app<->signal-service interop for the purposes of web socket shadowing.
*/
interface WebSocketShadowingBridge {
/**
* Persist shadowing stats snapshot.
*/
fun writeStatsSnapshot(bytes: ByteArray)
/**
* Restore shadowing stats snapshot.
*/
fun readStatsSnapshot(): ByteArray?
/**
* Display a notification the user to submit debug logs, with a custom message.
*/
fun triggerFailureNotification(message: String)
}

View File

@@ -0,0 +1,18 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
syntax = "proto3";
package signalservice;
option java_package = "org.whispersystems.signalservice.internal.websocket";
option java_outer_classname = "ShadowingStats";
message Snapshot {
int32 requestsCompared = 1;
int32 failures = 2;
int32 badStatuses = 3;
int32 reconnects = 4;
int64 lastNotified = 5;
}