Remove rx send remote config and group send using rx always.

This commit is contained in:
Cody Henthorne
2024-06-18 12:03:12 -04:00
committed by Greyson Parrelli
parent 5ecf60a306
commit 3551e7ec00
3 changed files with 13 additions and 120 deletions

View File

@@ -144,8 +144,7 @@ public class ApplicationDependencyProvider implements AppDependencies.Provider {
provideGroupsV2Operations(signalServiceConfiguration).getProfileOperations(),
SignalExecutors.newCachedBoundedExecutor("signal-messages", ThreadUtil.PRIORITY_IMPORTANT_BACKGROUND_THREAD, 1, 16, 30),
ByteUnit.KILOBYTES.toBytes(256),
RemoteConfig.okHttpAutomaticRetry(),
RemoteConfig.useRxMessageSending());
RemoteConfig.okHttpAutomaticRetry());
}
@Override

View File

@@ -968,15 +968,6 @@ object RemoteConfig {
hotSwappable = true
)
/** Use Rx threading model to do sends. */
@JvmStatic
@get:JvmName("useRxMessageSending")
val useRxMessageSending: Boolean by remoteBoolean(
key = "android.rxMessageSend.2",
defaultValue = false,
hotSwappable = true
)
/** The lifespan of a linked device (i.e. the time it can be inactive for before it expires), in milliseconds. */
@JvmStatic
val linkedDeviceLifespan: Long by remoteValue(

View File

@@ -188,7 +188,6 @@ public class SignalServiceMessageSender {
private final ExecutorService executor;
private final Scheduler scheduler;
private final long maxEnvelopeSize;
private final boolean useRxMessageSend;
public SignalServiceMessageSender(SignalServiceConfiguration urls,
CredentialsProvider credentialsProvider,
@@ -200,8 +199,7 @@ public class SignalServiceMessageSender {
ClientZkProfileOperations clientZkProfileOperations,
ExecutorService executor,
long maxEnvelopeSize,
boolean automaticNetworkRetry,
boolean useRxMessageSend)
boolean automaticNetworkRetry)
{
this.socket = new PushServiceSocket(urls, credentialsProvider, signalAgent, clientZkProfileOperations, automaticNetworkRetry);
this.aciStore = store.aci();
@@ -215,7 +213,6 @@ public class SignalServiceMessageSender {
this.executor = executor != null ? executor : Executors.newSingleThreadExecutor();
this.maxEnvelopeSize = maxEnvelopeSize;
this.localPniIdentity = store.pni().getIdentityKeyPair();
this.useRxMessageSend = useRxMessageSend;
this.scheduler = Schedulers.from(executor, false, false);
}
@@ -1936,100 +1933,6 @@ public class SignalServiceMessageSender {
return SignalServiceSyncMessage.forSentTranscript(transcript);
}
private List<SendMessageResult> sendMessage(List<SignalServiceAddress> recipients,
List<Optional<UnidentifiedAccess>> unidentifiedAccess,
long timestamp,
EnvelopeContent content,
boolean online,
PartialSendCompleteListener partialListener,
CancelationSignal cancelationSignal,
SendEvents sendEvents,
boolean urgent,
boolean story)
throws IOException
{
if (useRxMessageSend) {
return sendMessageRx(recipients, unidentifiedAccess, timestamp, content, online, partialListener, cancelationSignal, sendEvents, urgent, story);
}
Log.d(TAG, "[" + timestamp + "] Sending to " + recipients.size() + " recipients.");
enforceMaxContentSize(content);
long startTime = System.currentTimeMillis();
List<Future<SendMessageResult>> futureResults = new LinkedList<>();
Iterator<SignalServiceAddress> recipientIterator = recipients.iterator();
Iterator<Optional<UnidentifiedAccess>> unidentifiedAccessIterator = unidentifiedAccess.iterator();
while (recipientIterator.hasNext()) {
SignalServiceAddress recipient = recipientIterator.next();
Optional<UnidentifiedAccess> access = unidentifiedAccessIterator.next();
futureResults.add(executor.submit(() -> {
SendMessageResult result = sendMessage(recipient, access, timestamp, content, online, cancelationSignal, sendEvents, urgent, story);
if (partialListener != null) {
partialListener.onPartialSendComplete(result);
}
return result;
}));
}
List<SendMessageResult> results = new ArrayList<>(futureResults.size());
recipientIterator = recipients.iterator();
for (Future<SendMessageResult> futureResult : futureResults) {
SignalServiceAddress recipient = recipientIterator.next();
try {
results.add(futureResult.get());
} catch (ExecutionException e) {
if (e.getCause() instanceof UntrustedIdentityException) {
Log.w(TAG, "[" + timestamp + "] Hit identity mismatch: " + recipient.getIdentifier(), e);
results.add(SendMessageResult.identityFailure(recipient, ((UntrustedIdentityException) e.getCause()).getIdentityKey()));
} else if (e.getCause() instanceof UnregisteredUserException) {
Log.w(TAG, "[" + timestamp + "] Hit unregistered user: " + recipient.getIdentifier());
results.add(SendMessageResult.unregisteredFailure(recipient));
} else if (e.getCause() instanceof PushNetworkException) {
Log.w(TAG, "[" + timestamp + "] Hit network failure: " + recipient.getIdentifier(), e);
results.add(SendMessageResult.networkFailure(recipient));
} else if (e.getCause() instanceof ServerRejectedException) {
Log.w(TAG, "[" + timestamp + "] Hit server rejection: " + recipient.getIdentifier(), e);
throw ((ServerRejectedException) e.getCause());
} else if (e.getCause() instanceof ProofRequiredException) {
Log.w(TAG, "[" + timestamp + "] Hit proof required: " + recipient.getIdentifier(), e);
results.add(SendMessageResult.proofRequiredFailure(recipient, (ProofRequiredException) e.getCause()));
} else if (e.getCause() instanceof RateLimitException) {
Log.w(TAG, "[" + timestamp + "] Hit rate limit: " + recipient.getIdentifier(), e);
results.add(SendMessageResult.rateLimitFailure(recipient, (RateLimitException) e.getCause()));
} else if (e.getCause() instanceof InvalidPreKeyException) {
Log.w(TAG, "[" + timestamp + "] Hit invalid prekey: " + recipient.getIdentifier(), e);
results.add(SendMessageResult.invalidPreKeyFailure(recipient));
} else {
Log.w(TAG, "[" + timestamp + "] Hit unknown exception: " + recipient.getIdentifier(), e);
throw new IOException(e);
}
} catch (InterruptedException e) {
throw new IOException(e);
}
}
double sendsForAverage = 0;
for (SendMessageResult result : results) {
if (result.getSuccess() != null && result.getSuccess().getDuration() != -1) {
sendsForAverage++;
}
}
double average = 0;
if (sendsForAverage > 0) {
for (SendMessageResult result : results) {
if (result.getSuccess() != null && result.getSuccess().getDuration() != -1) {
average += result.getSuccess().getDuration() / sendsForAverage;
}
}
}
Log.d(TAG, "[" + timestamp + "] Completed send to " + recipients.size() + " recipients in " + (System.currentTimeMillis() - startTime) + " ms, with an average time of " + Math.round(average) + " ms per send.");
return results;
}
private SendMessageResult sendMessage(SignalServiceAddress recipient,
Optional<UnidentifiedAccess> unidentifiedAccess,
long timestamp,
@@ -2134,19 +2037,19 @@ public class SignalServiceMessageSender {
* @return An unordered list of a {@link SendMessageResult} for each send.
* @throws IOException - Unknown failure or a failure not representable by an unsuccessful {@code SendMessageResult}.
*/
private List<SendMessageResult> sendMessageRx(List<SignalServiceAddress> recipients,
List<Optional<UnidentifiedAccess>> unidentifiedAccess,
long timestamp,
EnvelopeContent content,
boolean online,
PartialSendCompleteListener partialListener,
CancelationSignal cancelationSignal,
@Nullable SendEvents sendEvents,
boolean urgent,
boolean story)
private List<SendMessageResult> sendMessage(List<SignalServiceAddress> recipients,
List<Optional<UnidentifiedAccess>> unidentifiedAccess,
long timestamp,
EnvelopeContent content,
boolean online,
PartialSendCompleteListener partialListener,
CancelationSignal cancelationSignal,
@Nullable SendEvents sendEvents,
boolean urgent,
boolean story)
throws IOException
{
Log.d(TAG, "[" + timestamp + "] Sending to " + recipients.size() + " recipients via Rx.");
Log.d(TAG, "[" + timestamp + "] Sending to " + recipients.size() + " recipients.");
enforceMaxContentSize(content);
long startTime = System.currentTimeMillis();