From 116f702be6ab68240ce0fe0be2abe1f93b3f3305 Mon Sep 17 00:00:00 2001 From: Greyson Parrelli Date: Wed, 29 Apr 2026 11:30:23 -0400 Subject: [PATCH] Add Live Queries tab to Spinner. --- .../org/signal/core/util/tracing/Tracer.java | 59 +- lib/spinner/src/main/assets/live.hbs | 761 ++++++++++++++++++ .../src/main/assets/partials/prefix.hbs | 1 + .../org/signal/spinner/SpinnerQueryEvent.kt | 51 ++ .../signal/spinner/SpinnerQueryWebSocket.kt | 163 ++++ .../java/org/signal/spinner/SpinnerServer.kt | 32 +- 6 files changed, 1056 insertions(+), 11 deletions(-) create mode 100644 lib/spinner/src/main/assets/live.hbs create mode 100644 lib/spinner/src/main/java/org/signal/spinner/SpinnerQueryEvent.kt create mode 100644 lib/spinner/src/main/java/org/signal/spinner/SpinnerQueryWebSocket.kt diff --git a/core/util/src/main/java/org/signal/core/util/tracing/Tracer.java b/core/util/src/main/java/org/signal/core/util/tracing/Tracer.java index 98ce13238a..675d40d74a 100644 --- a/core/util/src/main/java/org/signal/core/util/tracing/Tracer.java +++ b/core/util/src/main/java/org/signal/core/util/tracing/Tracer.java @@ -15,6 +15,7 @@ import java.util.Queue; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -50,7 +51,7 @@ public final class Tracer { public static final class TrackId { public static final long DB_LOCK = -8675309; - private static final String DB_LOCK_NAME = "Database Lock"; + public static final String DB_LOCK_NAME = "Database Lock"; } private static final Tracer INSTANCE = new Tracer(); @@ -63,16 +64,26 @@ public final class Tracer { private final Map threadPackets; private final Queue eventPackets; private final AtomicInteger eventCount; + private final List eventListeners; private long lastSyncTime; private long maxBufferSize; private Tracer() { - this.clock = SystemClock::elapsedRealtimeNanos; - this.threadPackets = new ConcurrentHashMap<>(); - this.eventPackets = new ConcurrentLinkedQueue<>(); - this.eventCount = new AtomicInteger(0); - this.maxBufferSize = 3_500; + this.clock = SystemClock::elapsedRealtimeNanos; + this.threadPackets = new ConcurrentHashMap<>(); + this.eventPackets = new ConcurrentLinkedQueue<>(); + this.eventCount = new AtomicInteger(0); + this.eventListeners = new CopyOnWriteArrayList<>(); + this.maxBufferSize = 3_500; + } + + public void addEventListener(@NonNull EventListener listener) { + eventListeners.add(listener); + } + + public void removeEventListener(@NonNull EventListener listener) { + eventListeners.remove(listener); } public static @NonNull Tracer getInstance() { @@ -116,14 +127,36 @@ public final class Tracer { } addPacket(forMethodStart(methodName, time, trackId, values)); + + if (!eventListeners.isEmpty()) { + String trackName = (trackId == TrackId.DB_LOCK) ? TrackId.DB_LOCK_NAME : Thread.currentThread().getName(); + for (EventListener listener : eventListeners) { + listener.onStart(methodName, trackId, trackName, time, values); + } + } } public void end(@NonNull String methodName) { - addPacket(forMethodEnd(methodName, clock.getTimeNanos(), Thread.currentThread().getId())); + long time = clock.getTimeNanos(); + long threadId = Thread.currentThread().getId(); + + addPacket(forMethodEnd(methodName, time, threadId)); + notifyEnd(methodName, threadId, time); } public void end(@NonNull String methodName, long trackId) { - addPacket(forMethodEnd(methodName, clock.getTimeNanos(), trackId)); + long time = clock.getTimeNanos(); + addPacket(forMethodEnd(methodName, time, trackId)); + notifyEnd(methodName, trackId, time); + } + + private void notifyEnd(@NonNull String methodName, long trackId, long time) { + if (eventListeners.isEmpty()) { + return; + } + for (EventListener listener : eventListeners) { + listener.onEnd(methodName, trackId, time); + } } public @NonNull byte[] serialize() { @@ -233,4 +266,14 @@ public final class Tracer { private interface Clock { long getTimeNanos(); } + + /** + * Optional listener that observes raw start/end events as they flow through the tracer. Listeners + * are invoked synchronously on whatever thread fired the event, so implementations must be cheap + * and non-blocking. Note that nothing else is guaranteed about ordering across threads. + */ + public interface EventListener { + void onStart(@NonNull String name, long trackId, @NonNull String trackName, long timestampNanos, @Nullable Map values); + void onEnd(@NonNull String name, long trackId, long timestampNanos); + } } diff --git a/lib/spinner/src/main/assets/live.hbs b/lib/spinner/src/main/assets/live.hbs new file mode 100644 index 0000000000..61c4772fb8 --- /dev/null +++ b/lib/spinner/src/main/assets/live.hbs @@ -0,0 +1,761 @@ + + {{> partials/head title="Live Queries" }} + + + + + + + {{> partials/prefix isLive=true}} + +
+ + + +
0 spans
+
+
+ +
+
+
+ +
+
+
+
+
+
+
+ +
+
+
+
+
+ + {{> partials/suffix }} + + + + diff --git a/lib/spinner/src/main/assets/partials/prefix.hbs b/lib/spinner/src/main/assets/partials/prefix.hbs index b1480ccee1..d4446138e1 100644 --- a/lib/spinner/src/main/assets/partials/prefix.hbs +++ b/lib/spinner/src/main/assets/partials/prefix.hbs @@ -31,6 +31,7 @@
  • Browse
  • Query
  • Recent
  • +
  • Live Queries
  • Logs
  • {{#each plugins}}
  • {{name}}
  • diff --git a/lib/spinner/src/main/java/org/signal/spinner/SpinnerQueryEvent.kt b/lib/spinner/src/main/java/org/signal/spinner/SpinnerQueryEvent.kt new file mode 100644 index 0000000000..0fbc36a19e --- /dev/null +++ b/lib/spinner/src/main/java/org/signal/spinner/SpinnerQueryEvent.kt @@ -0,0 +1,51 @@ +/* + * Copyright 2026 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.signal.spinner + +import org.json.JSONObject + +internal sealed class SpinnerQueryEvent { + + abstract fun serialize(): String + + data class Start( + val trackId: Long, + val trackName: String, + val name: String, + val query: String?, + val table: String?, + val holder: String?, + val timestampMs: Long + ) : SpinnerQueryEvent() { + override fun serialize(): String { + val out = JSONObject() + out.put("type", "start") + out.put("trackId", trackId) + out.put("trackName", trackName) + out.put("name", name) + query?.let { out.put("query", it) } + table?.let { out.put("table", it) } + holder?.let { out.put("holder", it) } + out.put("t", timestampMs) + return out.toString(0) + } + } + + data class End( + val trackId: Long, + val name: String, + val timestampMs: Long + ) : SpinnerQueryEvent() { + override fun serialize(): String { + val out = JSONObject() + out.put("type", "end") + out.put("trackId", trackId) + out.put("name", name) + out.put("t", timestampMs) + return out.toString(0) + } + } +} diff --git a/lib/spinner/src/main/java/org/signal/spinner/SpinnerQueryWebSocket.kt b/lib/spinner/src/main/java/org/signal/spinner/SpinnerQueryWebSocket.kt new file mode 100644 index 0000000000..156a904ace --- /dev/null +++ b/lib/spinner/src/main/java/org/signal/spinner/SpinnerQueryWebSocket.kt @@ -0,0 +1,163 @@ +/* + * Copyright 2026 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.signal.spinner + +import android.annotation.SuppressLint +import android.util.Log +import fi.iki.elonen.NanoHTTPD +import fi.iki.elonen.NanoWSD +import fi.iki.elonen.NanoWSD.WebSocket +import org.signal.core.util.tracing.Tracer +import java.io.IOException +import java.util.ArrayDeque +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock + +@SuppressLint("LogNotSignal") +internal class SpinnerQueryWebSocket(handshakeRequest: NanoHTTPD.IHTTPSession) : WebSocket(handshakeRequest) { + + companion object { + private const val TAG = "SpinnerQueryWebSocket" + + private const val MAX_PENDING = 10_000 + + private val pending: ArrayDeque = ArrayDeque() + private val openSockets: MutableList = mutableListOf() + + private val lock = ReentrantLock() + private val condition = lock.newCondition() + + private val dispatchThread: DispatchThread = DispatchThread().also { it.start() } + + /** Per-track stack of currently-forwarded frame names. Used to drop end events for frames we filtered out. */ + private val forwardedFrames: ConcurrentHashMap> = ConcurrentHashMap() + + private val tracerListener = object : Tracer.EventListener { + override fun onStart(name: String, trackId: Long, trackName: String, timestampNanos: Long, values: Map?) { + val query = values?.get("query") + val isLockEvent = trackId == Tracer.TrackId.DB_LOCK + if (query == null && !isLockEvent) { + return + } + + val stack = forwardedFrames.getOrPut(trackId) { ArrayDeque() } + synchronized(stack) { + stack.addLast(name) + } + + enqueue( + SpinnerQueryEvent.Start( + trackId = trackId, + trackName = trackName, + name = name, + query = query, + table = values?.get("table"), + holder = if (isLockEvent) values?.get("thread") else null, + timestampMs = timestampNanos / 1_000_000L + ) + ) + } + + override fun onEnd(name: String, trackId: Long, timestampNanos: Long) { + val stack = forwardedFrames[trackId] ?: return + val matched = synchronized(stack) { + if (stack.isNotEmpty() && stack.last() == name) { + stack.removeLast() + true + } else { + false + } + } + if (!matched) { + return + } + + enqueue( + SpinnerQueryEvent.End( + trackId = trackId, + name = name, + timestampMs = timestampNanos / 1_000_000L + ) + ) + } + } + + private fun enqueue(event: SpinnerQueryEvent) { + lock.withLock { + if (openSockets.isEmpty()) { + return + } + pending += event + if (pending.size > MAX_PENDING) { + pending.removeFirst() + } + condition.signal() + } + } + } + + override fun onOpen() { + Log.d(TAG, "onOpen()") + + val firstSocket = lock.withLock { + openSockets += this + condition.signal() + openSockets.size == 1 + } + + if (firstSocket) { + Tracer.getInstance().addEventListener(tracerListener) + } + } + + override fun onClose(code: NanoWSD.WebSocketFrame.CloseCode, reason: String?, initiatedByRemote: Boolean) { + Log.d(TAG, "onClose()") + + val lastSocket = lock.withLock { + openSockets -= this + openSockets.isEmpty() + } + + if (lastSocket) { + Tracer.getInstance().removeEventListener(tracerListener) + forwardedFrames.clear() + lock.withLock { + pending.clear() + } + } + } + + override fun onMessage(message: NanoWSD.WebSocketFrame) = Unit + + override fun onPong(pong: NanoWSD.WebSocketFrame) = Unit + + override fun onException(exception: IOException) { + Log.d(TAG, "onException()", exception) + } + + private class DispatchThread : Thread("SpinnerQuery") { + override fun run() { + while (true) { + val (sockets, event) = lock.withLock { + while (pending.isEmpty() || openSockets.isEmpty()) { + condition.await() + } + openSockets.toList() to pending.removeFirst() + } + + val payload = event.serialize() + sockets.forEach { socket -> + try { + socket.send(payload) + } catch (e: IOException) { + Log.w(TAG, "Failed to send a query event!", e) + } + } + } + } + } +} diff --git a/lib/spinner/src/main/java/org/signal/spinner/SpinnerServer.kt b/lib/spinner/src/main/java/org/signal/spinner/SpinnerServer.kt index beed7af199..1907062190 100644 --- a/lib/spinner/src/main/java/org/signal/spinner/SpinnerServer.kt +++ b/lib/spinner/src/main/java/org/signal/spinner/SpinnerServer.kt @@ -76,7 +76,9 @@ internal class SpinnerServer( session.method == Method.GET && session.uri == "/recent" -> getRecent(dbParam) session.method == Method.GET && session.uri == "/trace" -> getTrace() session.method == Method.GET && session.uri == "/logs" -> getLogs(dbParam) - isWebsocketRequested(session) && session.uri == "/logs/websocket" -> getLogWebSocket(session) + session.method == Method.GET && session.uri == "/live" -> getLive(dbParam) + isWebsocketRequested(session) && session.uri == "/logs/websocket" -> getWebSocketResponse(session) + isWebsocketRequested(session) && session.uri == "/live/websocket" -> getWebSocketResponse(session) else -> { val plugin = plugins[session.uri] if (plugin != null && session.method == Method.GET) { @@ -93,7 +95,10 @@ internal class SpinnerServer( } override fun openWebSocket(handshake: IHTTPSession): WebSocket { - return SpinnerLogWebSocket(handshake) + return when (handshake.uri) { + "/live/websocket" -> SpinnerQueryWebSocket(handshake) + else -> SpinnerLogWebSocket(handshake) + } } fun onSql(dbName: String, sql: String) { @@ -242,7 +247,20 @@ internal class SpinnerServer( ) } - private fun getLogWebSocket(session: IHTTPSession): Response { + private fun getLive(dbName: String): Response { + return renderTemplate( + "live", + LivePageModel( + environment = environment, + deviceInfo = deviceInfo.resolve(), + database = dbName, + databases = databases.keys.toList(), + plugins = plugins.values.toList() + ) + ) + } + + private fun getWebSocketResponse(session: IHTTPSession): Response { val headers = session.headers val webSocket = openWebSocket(session) @@ -529,6 +547,14 @@ internal class SpinnerServer( override val plugins: List ) : PrefixPageData + data class LivePageModel( + override val environment: String, + override val deviceInfo: Map, + override val database: String, + override val databases: List, + override val plugins: List + ) : PrefixPageData + data class PluginPageModel( override val environment: String, override val deviceInfo: Map,