Remove TracingExecutors.

This commit is contained in:
Greyson Parrelli
2023-04-01 01:53:52 -04:00
committed by Alex Hart
parent 0f15562a28
commit db5f8707ec
13 changed files with 15 additions and 149 deletions

View File

@@ -38,10 +38,9 @@ import java.util.Optional
class MessageContentProcessorTestV2 {
companion object {
private val TAGS = listOf(MessageContentProcessor.TAG, MessageContentProcessorV2.TAG, AttachmentTable.TAG)
private val TAGS = listOf(MessageContentProcessorV2.TAG, AttachmentTable.TAG)
private val GENERALIZE_TAG = mapOf(
MessageContentProcessor.TAG to "MCP",
MessageContentProcessorV2.TAG to "MCP",
AttachmentTable.TAG to AttachmentTable.TAG
)

View File

@@ -8,7 +8,6 @@ import okio.ByteString
import okio.ByteString.Companion.toByteString
import org.junit.After
import org.junit.Before
import org.junit.Ignore
import org.junit.Rule
import org.junit.Test
import org.junit.runner.RunWith
@@ -37,7 +36,7 @@ import android.util.Log as AndroidLog
/**
* Sends N messages from Bob to Alice to track performance of Alice's processing of messages.
*/
@Ignore("Ignore test in normal testing as it's a performance test with no assertions")
// @Ignore("Ignore test in normal testing as it's a performance test with no assertions")
@RunWith(AndroidJUnit4::class)
class MessageProcessingPerformanceTest {

View File

@@ -27,7 +27,7 @@ private val TAG = Log.tag(JumboEmoji::class.java)
*/
object JumboEmoji {
private val executor = ThreadUtil.trace(SignalExecutors.newCachedSingleThreadExecutor("jumbo-emoji", ThreadUtil.PRIORITY_IMPORTANT_BACKGROUND_THREAD))
private val executor = SignalExecutors.newCachedSingleThreadExecutor("jumbo-emoji", ThreadUtil.PRIORITY_IMPORTANT_BACKGROUND_THREAD)
const val MAX_JUMBOJI_COUNT = 5

View File

@@ -59,7 +59,7 @@ public class JobManager implements ConstraintObserver.Notifier {
public JobManager(@NonNull Application application, @NonNull Configuration configuration) {
this.application = application;
this.configuration = configuration;
this.executor = ThreadUtil.trace(new FilteredExecutor(configuration.getExecutorFactory().newSingleThreadExecutor("signal-JobManager"), ThreadUtil::isMainThread));
this.executor = new FilteredExecutor(configuration.getExecutorFactory().newSingleThreadExecutor("signal-JobManager"), ThreadUtil::isMainThread);
this.jobTracker = configuration.getJobTracker();
this.jobController = new JobController(application,
configuration.getJobStorage(),

View File

@@ -38,7 +38,7 @@ public final class SignalWebSocketHealthMonitor implements HealthMonitor {
private static final long KEEP_ALIVE_SEND_CADENCE = TimeUnit.SECONDS.toMillis(WebSocketConnection.KEEPALIVE_TIMEOUT_SECONDS);
private static final long MAX_TIME_SINCE_SUCCESSFUL_KEEP_ALIVE = KEEP_ALIVE_SEND_CADENCE * 3;
private final Executor executor = ThreadUtil.trace(Executors.newSingleThreadExecutor());
private final Executor executor = Executors.newSingleThreadExecutor();
private final Application context;
private SignalWebSocket signalWebSocket;

View File

@@ -49,7 +49,7 @@ public final class LiveRecipientCache {
private final AtomicBoolean warmedUp;
public LiveRecipientCache(@NonNull Context context) {
this(context, ThreadUtil.trace(new FilteredExecutor(SignalExecutors.newCachedBoundedExecutor("signal-recipients", ThreadUtil.PRIORITY_UI_BLOCKING_THREAD, 1, 4, 15), () -> !SignalDatabase.inTransaction())));
this(context, new FilteredExecutor(SignalExecutors.newCachedBoundedExecutor("signal-recipients", ThreadUtil.PRIORITY_UI_BLOCKING_THREAD, 1, 4, 15), () -> !SignalDatabase.inTransaction()));
}
@VisibleForTesting

View File

@@ -7,12 +7,7 @@ import android.os.Process;
import androidx.annotation.NonNull;
import androidx.annotation.VisibleForTesting;
import org.signal.core.util.concurrent.TracingExecutor;
import org.signal.core.util.concurrent.TracingExecutorService;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
/**
* Thread related utility functions.
@@ -117,12 +112,4 @@ public final class ThreadUtil {
Thread.sleep(millis);
} catch (InterruptedException ignored) { }
}
public static Executor trace(Executor executor) {
return new TracingExecutor(executor);
}
public static ExecutorService trace(ExecutorService executor) {
return new TracingExecutorService(executor);
}
}

View File

@@ -1,9 +1,9 @@
package org.signal.core.util.concurrent
import android.os.Handler
import org.signal.core.util.ExceptionUtil
import org.signal.core.util.logging.Log
import java.util.concurrent.ExecutorService
import java.util.concurrent.ThreadPoolExecutor
/**
* A class that polls active threads at a set interval and logs when multiple threads are BLOCKED.
@@ -73,7 +73,8 @@ class DeadlockDetector(private val handler: Handler, private val pollingInterval
.filter { it.key.name.startsWith(executorInfo.namePrefix) }
.toMap()
val executor: TracingExecutorService = executorInfo.executor as TracingExecutorService
val executor: ThreadPoolExecutor = executorInfo.executor as ThreadPoolExecutor
Log.w(TAG, buildLogString("Found a full executor! ${executor.activeCount}/${executor.maximumPoolSize} threads active with ${executor.queue.size} tasks queued.", fullMap))
}
lastThreadDump = threads
@@ -122,12 +123,7 @@ class DeadlockDetector(private val handler: Handler, private val pollingInterval
for (entry in blocked) {
stringBuilder.append("-- [${entry.key.id}] ${entry.key.name} | ${entry.key.state}\n")
val callerThrowable: Throwable? = TracedThreads.callerStackTraces[entry.key.id]
val stackTrace: Array<StackTraceElement> = if (callerThrowable != null) {
ExceptionUtil.joinStackTrace(entry.value, callerThrowable.stackTrace)
} else {
entry.value
}
val stackTrace: Array<StackTraceElement> = entry.value
for (element in stackTrace) {
stringBuilder.append("$element\n")
@@ -140,7 +136,7 @@ class DeadlockDetector(private val handler: Handler, private val pollingInterval
}
private fun isExecutorFull(executor: ExecutorService): Boolean {
return if (executor is TracingExecutorService) {
return if (executor is ThreadPoolExecutor) {
executor.queue.size > CONCERNING_QUEUE_THRESHOLD
} else {
false

View File

@@ -18,10 +18,10 @@ import java.util.concurrent.atomic.AtomicInteger;
public final class SignalExecutors {
public static final ExecutorService UNBOUNDED = ThreadUtil.trace(Executors.newCachedThreadPool(new NumberedThreadFactory("signal-unbounded", ThreadUtil.PRIORITY_BACKGROUND_THREAD)));
public static final ExecutorService BOUNDED = ThreadUtil.trace(Executors.newFixedThreadPool(4, new NumberedThreadFactory("signal-bounded", ThreadUtil.PRIORITY_BACKGROUND_THREAD)));
public static final ExecutorService SERIAL = ThreadUtil.trace(Executors.newSingleThreadExecutor(new NumberedThreadFactory("signal-serial", ThreadUtil.PRIORITY_BACKGROUND_THREAD)));
public static final ExecutorService BOUNDED_IO = ThreadUtil.trace(newCachedBoundedExecutor("signal-io-bounded", ThreadUtil.PRIORITY_IMPORTANT_BACKGROUND_THREAD, 1, 32, 30));
public static final ExecutorService UNBOUNDED = Executors.newCachedThreadPool(new NumberedThreadFactory("signal-unbounded", ThreadUtil.PRIORITY_BACKGROUND_THREAD));
public static final ExecutorService BOUNDED = Executors.newFixedThreadPool(4, new NumberedThreadFactory("signal-bounded", ThreadUtil.PRIORITY_BACKGROUND_THREAD));
public static final ExecutorService SERIAL = Executors.newSingleThreadExecutor(new NumberedThreadFactory("signal-serial", ThreadUtil.PRIORITY_BACKGROUND_THREAD));
public static final ExecutorService BOUNDED_IO = newCachedBoundedExecutor("signal-io-bounded", ThreadUtil.PRIORITY_IMPORTANT_BACKGROUND_THREAD, 1, 32, 30);
private SignalExecutors() {}

View File

@@ -1,12 +0,0 @@
package org.signal.core.util.concurrent
import java.util.concurrent.ConcurrentHashMap
/**
* A container for keeping track of the caller stack traces of the threads we care about.
*
* Note: This should only be used for debugging. To keep overhead minimal, not much effort has been put into ensuring this map is 100% accurate.
*/
internal object TracedThreads {
val callerStackTraces: MutableMap<Long, Throwable> = ConcurrentHashMap()
}

View File

@@ -1,28 +0,0 @@
package org.signal.core.util.concurrent
import java.util.concurrent.Executor
/**
* An executor that will keep track of the stack trace at the time of calling [execute] and use that to build a more useful stack trace in the event of a crash.
*/
internal class TracingExecutor(val wrapped: Executor) : Executor by wrapped {
override fun execute(command: Runnable?) {
val callerStackTrace = Throwable()
wrapped.execute {
val currentThread: Thread = Thread.currentThread()
val currentHandler: Thread.UncaughtExceptionHandler? = currentThread.uncaughtExceptionHandler
val originalHandler: Thread.UncaughtExceptionHandler? = if (currentHandler is TracingUncaughtExceptionHandler) currentHandler.originalHandler else currentHandler
currentThread.uncaughtExceptionHandler = TracingUncaughtExceptionHandler(originalHandler, callerStackTrace)
TracedThreads.callerStackTraces.put(currentThread.id, callerStackTrace)
try {
command?.run()
} finally {
TracedThreads.callerStackTraces.remove(currentThread.id)
}
}
}
}

View File

@@ -1,58 +0,0 @@
package org.signal.core.util.concurrent
import java.util.Queue
import java.util.concurrent.ExecutorService
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
/**
* An executor that will keep track of the stack trace at the time of calling [execute] and use that to build a more useful stack trace in the event of a crash.
*/
internal class TracingExecutorService(val wrapped: ExecutorService) : ExecutorService by wrapped {
override fun execute(command: Runnable?) {
val callerStackTrace = Throwable()
wrapped.execute {
val currentThread: Thread = Thread.currentThread()
val currentHandler: Thread.UncaughtExceptionHandler? = currentThread.uncaughtExceptionHandler
val originalHandler: Thread.UncaughtExceptionHandler? = if (currentHandler is TracingUncaughtExceptionHandler) currentHandler.originalHandler else currentHandler
currentThread.uncaughtExceptionHandler = TracingUncaughtExceptionHandler(originalHandler, callerStackTrace)
TracedThreads.callerStackTraces.put(currentThread.id, callerStackTrace)
try {
command?.run()
} finally {
TracedThreads.callerStackTraces.remove(currentThread.id)
}
}
}
val queue: Queue<Runnable>
get() {
return if (wrapped is ThreadPoolExecutor) {
wrapped.queue
} else {
LinkedBlockingQueue()
}
}
val activeCount: Int
get() {
return if (wrapped is ThreadPoolExecutor) {
wrapped.activeCount
} else {
0
}
}
val maximumPoolSize: Int
get() {
return if (wrapped is ThreadPoolExecutor) {
wrapped.maximumPoolSize
} else {
0
}
}
}

View File

@@ -1,17 +0,0 @@
package org.signal.core.util.concurrent
import org.signal.core.util.ExceptionUtil
/**
* An uncaught exception handler that will combine a caller stack trace with the exception to print a more useful stack trace.
*/
internal class TracingUncaughtExceptionHandler(
val originalHandler: Thread.UncaughtExceptionHandler?,
private val callerStackTrace: Throwable
) : Thread.UncaughtExceptionHandler {
override fun uncaughtException(thread: Thread, exception: Throwable) {
val updated = ExceptionUtil.joinStackTrace(exception, callerStackTrace)
originalHandler?.uncaughtException(thread, updated)
}
}