Prevent thread starvation during message sending.

This commit is contained in:
Cody Henthorne
2024-04-01 12:41:49 -04:00
committed by Greyson Parrelli
parent 6c697fad8b
commit 08a407dc23

View File

@@ -186,6 +186,7 @@ public class SignalServiceMessageSender {
private final MessagingService messagingService;
private final ExecutorService executor;
private final Scheduler scheduler;
private final long maxEnvelopeSize;
private final boolean useRxMessageSend;
@@ -215,6 +216,7 @@ public class SignalServiceMessageSender {
this.maxEnvelopeSize = maxEnvelopeSize;
this.localPniIdentity = store.pni().getIdentityKeyPair();
this.useRxMessageSend = useRxMessageSend;
this.scheduler = Schedulers.from(executor, false, false);
}
/**
@@ -2151,7 +2153,7 @@ public class SignalServiceMessageSender {
List<SendMessageResult> results;
try {
results = Observable.mergeDelayError(singleResults, Integer.MAX_VALUE, 1)
.observeOn(Schedulers.io(), true)
.observeOn(scheduler, true)
.scan(new ArrayList<SendMessageResult>(singleResults.size()), (state, result) -> {
state.add(result);
if (partialListener != null) {
@@ -2247,7 +2249,7 @@ public class SignalServiceMessageSender {
return messagingService.send(messages, unidentifiedAccess, story)
.map(r -> new kotlin.Pair<>(messages, r));
})
.observeOn(Schedulers.io())
.observeOn(scheduler)
.flatMap(pair -> {
final OutgoingPushMessageList messages = pair.getFirst();
final ServiceResponse<SendMessageResponse> serviceResponse = pair.getSecond();
@@ -2295,7 +2297,7 @@ public class SignalServiceMessageSender {
System.currentTimeMillis() - startTime,
content.getContent()
);
}).subscribeOn(Schedulers.io());
}).subscribeOn(scheduler);
}
});