From 08a407dc23d18cf641bb32909dbe67913ad49626 Mon Sep 17 00:00:00 2001 From: Cody Henthorne Date: Mon, 1 Apr 2024 12:41:49 -0400 Subject: [PATCH] Prevent thread starvation during message sending. --- .../signalservice/api/SignalServiceMessageSender.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java index 9104c81518..0fa2b7aaff 100644 --- a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java +++ b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessageSender.java @@ -186,6 +186,7 @@ public class SignalServiceMessageSender { private final MessagingService messagingService; private final ExecutorService executor; + private final Scheduler scheduler; private final long maxEnvelopeSize; private final boolean useRxMessageSend; @@ -215,6 +216,7 @@ public class SignalServiceMessageSender { this.maxEnvelopeSize = maxEnvelopeSize; this.localPniIdentity = store.pni().getIdentityKeyPair(); this.useRxMessageSend = useRxMessageSend; + this.scheduler = Schedulers.from(executor, false, false); } /** @@ -2151,7 +2153,7 @@ public class SignalServiceMessageSender { List results; try { results = Observable.mergeDelayError(singleResults, Integer.MAX_VALUE, 1) - .observeOn(Schedulers.io(), true) + .observeOn(scheduler, true) .scan(new ArrayList(singleResults.size()), (state, result) -> { state.add(result); if (partialListener != null) { @@ -2247,7 +2249,7 @@ public class SignalServiceMessageSender { return messagingService.send(messages, unidentifiedAccess, story) .map(r -> new kotlin.Pair<>(messages, r)); }) - .observeOn(Schedulers.io()) + .observeOn(scheduler) .flatMap(pair -> { final OutgoingPushMessageList messages = pair.getFirst(); final ServiceResponse serviceResponse = pair.getSecond(); @@ -2295,7 +2297,7 @@ public class SignalServiceMessageSender { System.currentTimeMillis() - startTime, content.getContent() ); - }).subscribeOn(Schedulers.io()); + }).subscribeOn(scheduler); } });