Use libsignal-net for multi-recipient send.

This commit is contained in:
Cody Henthorne
2026-03-13 10:51:30 -04:00
committed by Michelle Tang
parent 6c1897d8d5
commit 4d2f23ec37
17 changed files with 240 additions and 249 deletions

View File

@@ -147,6 +147,11 @@ public class GroupCallUpdateSendJob extends BaseJob {
e instanceof RetryLaterException;
}
@Override
public long getNextRunAttemptBackoff(int pastAttemptCount, @NonNull Exception exception) {
return SendJobUtil.getBackoffMillisFromException(this, TAG, pastAttemptCount, exception, () -> super.getNextRunAttemptBackoff(pastAttemptCount, exception));
}
@Override
public void onFailure() {
if (recipients.size() < initialRecipientCount) {

View File

@@ -165,6 +165,11 @@ public class ProfileKeySendJob extends BaseJob {
e instanceof RetryLaterException;
}
@Override
public long getNextRunAttemptBackoff(int pastAttemptCount, @NonNull Exception exception) {
return SendJobUtil.getBackoffMillisFromException(this, TAG, pastAttemptCount, exception, () -> super.getNextRunAttemptBackoff(pastAttemptCount, exception));
}
@Override
public @Nullable byte[] serialize() {
return new JsonJobData.Builder()

View File

@@ -9,14 +9,17 @@ import androidx.annotation.WorkerThread;
import com.annimon.stream.Collectors;
import com.annimon.stream.Stream;
import org.signal.core.models.ServiceId;
import org.signal.core.models.ServiceId.ACI;
import org.signal.core.util.Base64;
import org.signal.core.util.logging.Log;
import org.signal.storageservice.storage.protos.groups.local.DecryptedGroup;
import org.thoughtcrime.securesms.database.RecipientTable;
import org.thoughtcrime.securesms.database.SignalDatabase;
import org.thoughtcrime.securesms.groups.GroupId;
import org.thoughtcrime.securesms.jobmanager.Job;
import org.thoughtcrime.securesms.jobmanager.impl.SealedSenderConstraint;
import org.thoughtcrime.securesms.jobmanager.JsonJobData;
import org.thoughtcrime.securesms.jobmanager.impl.SealedSenderConstraint;
import org.thoughtcrime.securesms.keyvalue.SignalStore;
import org.thoughtcrime.securesms.messages.GroupSendUtil;
import org.thoughtcrime.securesms.mms.MessageGroupContext;
@@ -25,7 +28,6 @@ import org.thoughtcrime.securesms.net.NotPushRegisteredException;
import org.thoughtcrime.securesms.recipients.Recipient;
import org.thoughtcrime.securesms.recipients.RecipientId;
import org.thoughtcrime.securesms.transport.RetryLaterException;
import org.signal.core.util.Base64;
import org.thoughtcrime.securesms.util.GroupUtil;
import org.whispersystems.signalservice.api.crypto.ContentHint;
import org.whispersystems.signalservice.api.crypto.UntrustedIdentityException;
@@ -33,8 +35,6 @@ import org.whispersystems.signalservice.api.groupsv2.DecryptedGroupUtil;
import org.whispersystems.signalservice.api.messages.SendMessageResult;
import org.whispersystems.signalservice.api.messages.SignalServiceDataMessage;
import org.whispersystems.signalservice.api.messages.SignalServiceGroupV2;
import org.signal.core.models.ServiceId;
import org.signal.core.models.ServiceId.ACI;
import org.whispersystems.signalservice.api.push.exceptions.ServerRejectedException;
import org.whispersystems.signalservice.internal.push.GroupContextV2;
@@ -168,6 +168,11 @@ public final class PushGroupSilentUpdateSendJob extends BaseJob {
e instanceof RetryLaterException;
}
@Override
public long getNextRunAttemptBackoff(int pastAttemptCount, @NonNull Exception exception) {
return SendJobUtil.getBackoffMillisFromException(this, TAG, pastAttemptCount, exception, () -> super.getNextRunAttemptBackoff(pastAttemptCount, exception));
}
@Override
public void onFailure() {
Log.w(TAG, "Failed to send remote delete to all recipients! (" + (initialRecipientCount - recipients.size() + "/" + initialRecipientCount + ")") );

View File

@@ -72,6 +72,7 @@ import org.signal.core.models.ServiceId.ACI;
import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulResponseCodeException;
import org.whispersystems.signalservice.api.push.exceptions.ProofRequiredException;
import org.whispersystems.signalservice.api.push.exceptions.RateLimitException;
import org.whispersystems.signalservice.api.push.exceptions.RetryNetworkException;
import org.whispersystems.signalservice.api.push.exceptions.ServerRejectedException;
import org.whispersystems.signalservice.internal.push.BodyRange;
@@ -160,29 +161,7 @@ public abstract class PushSendJob extends SendJob {
@Override
public long getNextRunAttemptBackoff(int pastAttemptCount, @NonNull Exception exception) {
if (exception instanceof ProofRequiredException) {
long backoff = ((ProofRequiredException) exception).getRetryAfterSeconds();
warn(TAG, "[Proof Required] Retry-After is " + backoff + " seconds.");
if (backoff >= 0) {
return TimeUnit.SECONDS.toMillis(backoff);
}
} else if (exception instanceof RateLimitException) {
long backoff = ((RateLimitException) exception).getRetryAfterMilliseconds().orElse(-1L);
if (backoff >= 0) {
return backoff;
}
} else if (exception instanceof NonSuccessfulResponseCodeException) {
if (((NonSuccessfulResponseCodeException) exception).is5xx()) {
return BackoffUtil.exponentialBackoff(pastAttemptCount, RemoteConfig.getServerErrorMaxBackoff());
}
} else if (exception instanceof RetryLaterException) {
long backoff = ((RetryLaterException) exception).getBackoff();
if (backoff >= 0) {
return backoff;
}
}
return super.getNextRunAttemptBackoff(pastAttemptCount, exception);
return SendJobUtil.getBackoffMillisFromException(this, TAG, pastAttemptCount, exception, () -> super.getNextRunAttemptBackoff(pastAttemptCount, exception));
}
protected Optional<byte[]> getProfileKey(@NonNull Recipient recipient) {

View File

@@ -15,8 +15,8 @@ import org.thoughtcrime.securesms.database.model.MessageRecord;
import org.thoughtcrime.securesms.database.model.ReactionRecord;
import org.thoughtcrime.securesms.dependencies.AppDependencies;
import org.thoughtcrime.securesms.groups.GroupId;
import org.thoughtcrime.securesms.jobmanager.JsonJobData;
import org.thoughtcrime.securesms.jobmanager.Job;
import org.thoughtcrime.securesms.jobmanager.JsonJobData;
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint;
import org.thoughtcrime.securesms.jobmanager.impl.SealedSenderConstraint;
import org.thoughtcrime.securesms.keyvalue.SignalStore;
@@ -193,6 +193,11 @@ public class ReactionSendJob extends BaseJob {
e instanceof RetryLaterException;
}
@Override
public long getNextRunAttemptBackoff(int pastAttemptCount, @NonNull Exception exception) {
return SendJobUtil.getBackoffMillisFromException(this, TAG, pastAttemptCount, exception, () -> super.getNextRunAttemptBackoff(pastAttemptCount, exception));
}
@Override
public void onFailure() {
if (recipients.size() < initialRecipientCount) {

View File

@@ -7,6 +7,7 @@ import androidx.annotation.WorkerThread;
import com.annimon.stream.Stream;
import org.signal.core.util.SetUtil;
import org.signal.core.util.Util;
import org.signal.core.util.logging.Log;
import org.thoughtcrime.securesms.database.MessageTable;
import org.thoughtcrime.securesms.database.NoSuchMessageException;
@@ -17,9 +18,9 @@ import org.thoughtcrime.securesms.database.model.MessageRecord;
import org.thoughtcrime.securesms.database.model.MmsMessageRecord;
import org.thoughtcrime.securesms.dependencies.AppDependencies;
import org.thoughtcrime.securesms.groups.GroupId;
import org.thoughtcrime.securesms.jobmanager.JsonJobData;
import org.thoughtcrime.securesms.jobmanager.Job;
import org.thoughtcrime.securesms.jobmanager.JobManager;
import org.thoughtcrime.securesms.jobmanager.JsonJobData;
import org.thoughtcrime.securesms.jobmanager.impl.SealedSenderConstraint;
import org.thoughtcrime.securesms.messages.GroupSendUtil;
import org.thoughtcrime.securesms.net.NotPushRegisteredException;
@@ -28,7 +29,6 @@ import org.thoughtcrime.securesms.recipients.RecipientId;
import org.thoughtcrime.securesms.recipients.RecipientUtil;
import org.thoughtcrime.securesms.transport.RetryLaterException;
import org.thoughtcrime.securesms.util.GroupUtil;
import org.signal.core.util.Util;
import org.whispersystems.signalservice.api.crypto.ContentHint;
import org.whispersystems.signalservice.api.crypto.UntrustedIdentityException;
import org.whispersystems.signalservice.api.messages.SendMessageResult;
@@ -198,6 +198,11 @@ public class RemoteDeleteSendJob extends BaseJob {
e instanceof RetryLaterException;
}
@Override
public long getNextRunAttemptBackoff(int pastAttemptCount, @NonNull Exception exception) {
return SendJobUtil.getBackoffMillisFromException(this, TAG, pastAttemptCount, exception, () -> super.getNextRunAttemptBackoff(pastAttemptCount, exception));
}
@Override
public void onFailure() {
Log.w(TAG, "Failed to send remote delete to all recipients! (" + (initialRecipientCount - recipients.size() + "/" + initialRecipientCount + ")") );

View File

@@ -0,0 +1,63 @@
/*
* Copyright 2026 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
@file:JvmName("SendJobUtil")
package org.thoughtcrime.securesms.jobs
import org.signal.core.util.logging.Log
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobmanager.JobLogger
import org.thoughtcrime.securesms.jobmanager.impl.BackoffUtil
import org.thoughtcrime.securesms.transport.RetryLaterException
import org.thoughtcrime.securesms.util.RemoteConfig.serverErrorMaxBackoff
import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulResponseCodeException
import org.whispersystems.signalservice.api.push.exceptions.ProofRequiredException
import org.whispersystems.signalservice.api.push.exceptions.RateLimitException
import org.whispersystems.signalservice.api.push.exceptions.RetryNetworkException
import java.util.concurrent.TimeUnit
import org.signal.libsignal.net.RetryLaterException as LibSignalRetryLaterException
fun Job.getBackoffMillisFromException(tag: String, pastAttemptCount: Int, exception: Exception, default: () -> Long): Long {
when (exception) {
is ProofRequiredException -> {
val backoff = exception.retryAfterSeconds
Log.w(tag, JobLogger.format(this, "[Proof Required] Retry-After is $backoff seconds."))
if (backoff >= 0) {
return TimeUnit.SECONDS.toMillis(backoff)
}
}
is RateLimitException -> {
val backoff = exception.retryAfterMilliseconds.orElse(-1L)
if (backoff >= 0) {
return backoff
}
}
is NonSuccessfulResponseCodeException -> {
if (exception.is5xx()) {
return BackoffUtil.exponentialBackoff(pastAttemptCount, serverErrorMaxBackoff)
}
}
is LibSignalRetryLaterException -> {
return exception.duration.toMillis()
}
is RetryNetworkException -> {
return exception.retryAfterMs
}
is RetryLaterException -> {
val backoff = exception.backoff
if (backoff >= 0) {
return backoff
}
}
}
return default()
}

View File

@@ -13,13 +13,9 @@ import org.whispersystems.signalservice.api.push.exceptions.RateLimitException
import org.whispersystems.signalservice.api.push.exceptions.ServerRejectedException
import org.whispersystems.signalservice.api.push.exceptions.UnregisteredUserException
import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException
import org.whispersystems.signalservice.internal.push.SendGroupMessageResponse
import org.whispersystems.signalservice.internal.push.SendMessageResponse
import org.whispersystems.signalservice.internal.push.exceptions.GroupMismatchedDevicesException
import org.whispersystems.signalservice.internal.push.exceptions.GroupStaleDevicesException
import org.whispersystems.signalservice.internal.push.exceptions.InAppPaymentProcessorError
import org.whispersystems.signalservice.internal.push.exceptions.InAppPaymentReceiptCredentialError
import org.whispersystems.signalservice.internal.push.exceptions.InvalidUnidentifiedAccessHeaderException
import org.whispersystems.signalservice.internal.push.exceptions.MismatchedDevicesException
import org.whispersystems.signalservice.internal.push.exceptions.PaymentsRegionException
import org.whispersystems.signalservice.internal.push.exceptions.StaleDevicesException
@@ -117,44 +113,6 @@ object NetworkResultUtil {
}
}
/**
* Convert [NetworkResult] into expected type exceptions for a multi-recipient message send.
*/
@JvmStatic
@Throws(
InvalidUnidentifiedAccessHeaderException::class,
NotFoundException::class,
GroupMismatchedDevicesException::class,
GroupStaleDevicesException::class,
RateLimitException::class,
ServerRejectedException::class,
WebSocketUnavailableException::class,
IOException::class
)
fun toGroupMessageSendLegacy(result: NetworkResult<SendGroupMessageResponse>): SendGroupMessageResponse {
return when (result) {
is NetworkResult.Success -> result.result
is NetworkResult.ApplicationError -> {
throw when (val error = result.throwable) {
is IOException, is RuntimeException -> error
else -> RuntimeException(error)
}
}
is NetworkResult.NetworkError -> throw result.exception
is NetworkResult.StatusCodeError -> {
throw when (result.code) {
401 -> InvalidUnidentifiedAccessHeaderException()
404 -> NotFoundException("At least one unregistered user is message send.")
409 -> GroupMismatchedDevicesException(result.parseJsonBody())
410 -> GroupStaleDevicesException(result.parseJsonBody())
413, 429 -> throw RateLimitException(result.code, "Rate Limited", Optional.ofNullable(result.header("retry-after")?.toLongOrNull()?.seconds?.inWholeMilliseconds))
508 -> ServerRejectedException()
else -> result.exception
}
}
}
}
@JvmStatic
@Throws(IOException::class)
fun <T> toPreKeysLegacy(result: NetworkResult<T>): T {

View File

@@ -11,6 +11,13 @@ import org.signal.core.util.Base64;
import org.signal.core.util.ProtoUtil;
import org.signal.core.util.UuidUtil;
import org.signal.libsignal.metadata.certificate.SenderCertificate;
import org.signal.libsignal.net.MismatchedDeviceException;
import org.signal.libsignal.net.MultiRecipientMessageResponse;
import org.signal.libsignal.net.MultiRecipientSendAuthorization;
import org.signal.libsignal.net.MultiRecipientSendFailure;
import org.signal.libsignal.net.RequestResult;
import org.signal.libsignal.net.RequestUnauthorizedException;
import org.signal.libsignal.net.RetryLaterException;
import org.signal.libsignal.protocol.IdentityKey;
import org.signal.libsignal.protocol.IdentityKeyPair;
import org.signal.libsignal.protocol.InvalidKeyException;
@@ -39,6 +46,7 @@ import org.whispersystems.signalservice.api.crypto.UntrustedIdentityException;
import org.whispersystems.signalservice.api.groupsv2.GroupSendEndorsements;
import org.whispersystems.signalservice.api.keys.KeysApi;
import org.whispersystems.signalservice.api.message.MessageApi;
import org.whispersystems.signalservice.api.message.MessageApiKt;
import org.whispersystems.signalservice.api.messages.SendMessageResult;
import org.whispersystems.signalservice.api.messages.SignalServiceAttachment;
import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentPointer;
@@ -75,11 +83,12 @@ import org.whispersystems.signalservice.api.push.DistributionId;
import org.whispersystems.signalservice.api.push.SignalServiceAddress;
import org.whispersystems.signalservice.api.push.exceptions.AuthorizationFailedException;
import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulResponseCodeException;
import org.whispersystems.signalservice.api.push.exceptions.NotFoundException;
import org.whispersystems.signalservice.api.push.exceptions.ProofRequiredException;
import org.whispersystems.signalservice.api.push.exceptions.PushNetworkException;
import org.whispersystems.signalservice.api.push.exceptions.RateLimitException;
import org.whispersystems.signalservice.api.push.exceptions.RetryNetworkException;
import org.whispersystems.signalservice.api.push.exceptions.ServerRejectedException;
import org.whispersystems.signalservice.api.push.exceptions.UnknownGroupSendException;
import org.whispersystems.signalservice.api.push.exceptions.UnregisteredUserException;
import org.whispersystems.signalservice.api.util.AttachmentPointerUtil;
import org.whispersystems.signalservice.api.util.CredentialsProvider;
@@ -97,8 +106,6 @@ import org.whispersystems.signalservice.internal.push.Content;
import org.whispersystems.signalservice.internal.push.DataMessage;
import org.whispersystems.signalservice.internal.push.EditMessage;
import org.whispersystems.signalservice.internal.push.GroupContextV2;
import org.whispersystems.signalservice.internal.push.GroupMismatchedDevices;
import org.whispersystems.signalservice.internal.push.GroupStaleDevices;
import org.whispersystems.signalservice.internal.push.MismatchedDevices;
import org.whispersystems.signalservice.internal.push.NullMessage;
import org.whispersystems.signalservice.internal.push.OutgoingPushMessage;
@@ -109,7 +116,6 @@ import org.whispersystems.signalservice.internal.push.ProvisioningVersion;
import org.whispersystems.signalservice.internal.push.PushAttachmentData;
import org.whispersystems.signalservice.internal.push.PushServiceSocket;
import org.whispersystems.signalservice.internal.push.ReceiptMessage;
import org.whispersystems.signalservice.internal.push.SendGroupMessageResponse;
import org.whispersystems.signalservice.internal.push.SendMessageResponse;
import org.whispersystems.signalservice.internal.push.StaleDevices;
import org.whispersystems.signalservice.internal.push.StoryMessage;
@@ -117,8 +123,6 @@ import org.whispersystems.signalservice.internal.push.SyncMessage;
import org.whispersystems.signalservice.internal.push.TextAttachment;
import org.whispersystems.signalservice.internal.push.TypingMessage;
import org.whispersystems.signalservice.internal.push.Verified;
import org.whispersystems.signalservice.internal.push.exceptions.GroupMismatchedDevicesException;
import org.whispersystems.signalservice.internal.push.exceptions.GroupStaleDevicesException;
import org.whispersystems.signalservice.internal.push.exceptions.InvalidUnidentifiedAccessHeaderException;
import org.whispersystems.signalservice.internal.push.exceptions.MismatchedDevicesException;
import org.whispersystems.signalservice.internal.push.exceptions.StaleDevicesException;
@@ -2558,52 +2562,53 @@ public class SignalServiceMessageSender {
sendEvents.onMessageEncrypted();
try {
try {
MultiRecipientSendAuthorization multiRecipientAuth = story ? MultiRecipientSendAuthorization.Story.INSTANCE
: new MultiRecipientSendAuthorization.GroupSend(groupSendEndorsements.toFullToken());
SendGroupMessageResponse response = NetworkResultUtil.toGroupMessageSendLegacy(messageApi.sendGroupMessage(ciphertext, sealedSenderAccess, timestamp, online, urgent, story));
return transformGroupResponseToMessageResults(targetInfo.devices, response, content);
} catch (InvalidUnidentifiedAccessHeaderException |
NotFoundException |
GroupMismatchedDevicesException |
GroupStaleDevicesException |
ServerRejectedException |
RateLimitException e) {
// Non-technical failures shouldn't be retried with socket
throw e;
} catch (WebSocketUnavailableException e) {
if (useRestFallback.getAsBoolean()) {
Log.i(TAG, "[sendGroupMessage][" + timestamp + "] Pipe unavailable, falling back... (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")");
} else {
Log.i(TAG, "[sendGroupMessage][" + timestamp + "] Pipe unavailable (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")");
throw e;
}
} catch (IOException e) {
if (useRestFallback.getAsBoolean()) {
Log.w(TAG, "[sendGroupMessage][" + timestamp + "] Pipe failed, falling back... (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")");
} else {
Log.w(TAG, "[sendGroupMessage][" + timestamp + "] Pipe failed (" + e.getClass().getSimpleName() + ": " + e.getMessage() + ")");
throw e;
}
}
RequestResult<MultiRecipientMessageResponse, MultiRecipientSendFailure> result = messageApi.sendGroupMessage(ciphertext, multiRecipientAuth, timestamp, online, urgent);
SendGroupMessageResponse response = socket.sendGroupMessage(ciphertext, sealedSenderAccess, timestamp, online, urgent, story);
return transformGroupResponseToMessageResults(targetInfo.devices, response, content);
} catch (GroupMismatchedDevicesException e) {
Log.w(TAG, "[sendGroupMessage][" + timestamp + "] Handling mismatched devices. (" + e.getMessage() + ")");
for (GroupMismatchedDevices mismatched : e.getMismatchedDevices()) {
SignalServiceAddress address = new SignalServiceAddress(ServiceId.parseOrThrow(mismatched.getUuid()), Optional.empty());
handleMismatchedDevices(address, mismatched.getDevices());
if (result instanceof RequestResult.Success) {
MultiRecipientMessageResponse response = ((RequestResult.Success<MultiRecipientMessageResponse>) result).getResult();
return transformGroupResponseToMessageResults(targetInfo.devices, MessageApiKt.unsentTargets(response), content);
} else if (result instanceof RequestResult.NonSuccess) {
MultiRecipientSendFailure error = ((RequestResult.NonSuccess<MultiRecipientSendFailure>) result).getError();
if (error instanceof MismatchedDeviceException) {
MismatchedDeviceException mismatchedDeviceException = (MismatchedDeviceException) error;
Log.w(TAG, "[sendGroupMessage][" + timestamp + "] Handling mismatched devices. (" + mismatchedDeviceException.getMessage() + ")");
for (MismatchedDeviceException.Entry entry : mismatchedDeviceException.getEntries()) {
SignalServiceAddress address = new SignalServiceAddress(ServiceId.fromLibSignal(entry.getAccount()));
MismatchedDevices devices = MismatchedDevices.fromLibSignal(entry);
handleMismatchedDevices(address, devices);
if (entry.getStaleDevices().length > 0) {
StaleDevices staleDevices = StaleDevices.fromLibSignal(entry);
handleStaleDevices(address, staleDevices);
}
}
} else if (error instanceof RequestUnauthorizedException) {
Log.w(TAG, "[sendGroupMessage][" + timestamp + "] Invalid access header.");
throw new InvalidUnidentifiedAccessHeaderException();
} else {
throw new IOException("Unknown multi-recipient send failure: " + error.getClass().getSimpleName());
}
} catch (GroupStaleDevicesException e) {
Log.w(TAG, "[sendGroupMessage][" + timestamp + "] Handling stale devices. (" + e.getMessage() + ")");
for (GroupStaleDevices stale : e.getStaleDevices()) {
SignalServiceAddress address = new SignalServiceAddress(ServiceId.parseOrThrow(stale.getUuid()), Optional.empty());
handleStaleDevices(address, stale.getDevices());
} else if (result instanceof RequestResult.RetryableNetworkError) {
RequestResult.RetryableNetworkError retryableError = (RequestResult.RetryableNetworkError) result;
IOException exception = retryableError.getNetworkError();
if (exception instanceof RetryLaterException) {
throw exception;
} else if (retryableError.getRetryAfter() != null && retryableError.getRetryAfter().toMillis() > 0) {
throw new RetryNetworkException(retryableError.getRetryAfter().toMillis(), exception);
} else {
throw exception;
}
} else if (result instanceof RequestResult.ApplicationError) {
Throwable cause = ((RequestResult.ApplicationError) result).getCause();
if (cause instanceof IOException) {
throw (IOException) cause;
} else if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else {
throw new UnknownGroupSendException(cause);
}
} catch (InvalidUnidentifiedAccessHeaderException e) {
Log.w(TAG, "[sendGroupMessage][" + timestamp + "] Invalid access header. (" + e.getMessage() + ")");
throw e;
}
Log.w(TAG, "[sendGroupMessage][" + timestamp + "] Attempt failed (i = " + i + ")");
@@ -2656,9 +2661,7 @@ public class SignalServiceMessageSender {
}
}
private List<SendMessageResult> transformGroupResponseToMessageResults(Map<SignalServiceAddress, List<Integer>> recipients, SendGroupMessageResponse response, Content content) {
Set<ServiceId> unregistered = response.getUnsentTargets();
private List<SendMessageResult> transformGroupResponseToMessageResults(Map<SignalServiceAddress, List<Integer>> recipients, Set<ServiceId> unregistered, Content content) {
List<SendMessageResult> failures = unregistered.stream()
.map(SignalServiceAddress::new)
.map(SendMessageResult::unregisteredFailure)

View File

@@ -26,8 +26,12 @@ data class GroupSendEndorsements(
private val expiration: Instant by lazy { Instant.ofEpochMilli(expirationMs) }
private val combinedEndorsement: GroupSendEndorsement by lazy { GroupSendEndorsement.combine(endorsements.values) }
fun toFullToken(): GroupSendFullToken {
return combinedEndorsement.toFullToken(groupSecretParams, expiration)
}
fun serialize(): ByteArray {
return combinedEndorsement.toFullToken(groupSecretParams, expiration).serialize()
return toFullToken().serialize()
}
fun forIndividuals(addresses: List<SignalServiceAddress>): List<GroupSendFullToken?> {

View File

@@ -5,16 +5,21 @@
package org.whispersystems.signalservice.api.message
import kotlinx.coroutines.runBlocking
import org.signal.core.models.ServiceId
import org.signal.libsignal.net.MultiRecipientMessageResponse
import org.signal.libsignal.net.MultiRecipientSendAuthorization
import org.signal.libsignal.net.MultiRecipientSendFailure
import org.signal.libsignal.net.RequestResult
import org.signal.libsignal.net.UnauthMessagesService
import org.signal.libsignal.net.getOrError
import org.whispersystems.signalservice.api.NetworkResult
import org.whispersystems.signalservice.api.crypto.SealedSenderAccess
import org.whispersystems.signalservice.api.websocket.SignalWebSocket
import org.whispersystems.signalservice.internal.post
import org.whispersystems.signalservice.internal.push.OutgoingPushMessageList
import org.whispersystems.signalservice.internal.push.SendGroupMessageResponse
import org.whispersystems.signalservice.internal.push.SendMessageResponse
import org.whispersystems.signalservice.internal.put
import org.whispersystems.signalservice.internal.putCustom
import org.whispersystems.signalservice.internal.websocket.WebSocketRequestMessage
import org.whispersystems.signalservice.internal.websocket.WebsocketResponse
@@ -70,24 +75,14 @@ class MessageApi(
}
/**
* Sends a common message to multiple recipients and requires some form of [sealedSenderAccess] unless it's a story.
*
* PUT /v1/messages/multi_recipient?ts=[timestamp]&online=[online]&urgent=[urgent]&story=[story]
* - 200: Success
* - 400: Message specified delivery to the same recipient device multiple times
* - 401: Message is not a story and [sealedSenderAccess] is missing or incorrect
* - 404: Message is not a story and some of the recipients are not registered Signal users
* - 409: Incorrect set of devices supplied for some recipients
* - 410: Stale devices supplied for some recipients
* Sends a common message to multiple recipients using the libsignal-net [UnauthMessagesService].
*/
fun sendGroupMessage(body: ByteArray, sealedSenderAccess: SealedSenderAccess, timestamp: Long, online: Boolean, urgent: Boolean, story: Boolean): NetworkResult<SendGroupMessageResponse> {
val request = WebSocketRequestMessage.putCustom(
path = "/v1/messages/multi_recipient?ts=$timestamp&online=${online.toQueryParam()}&urgent=${urgent.toQueryParam()}&story=${story.toQueryParam()}",
body = body,
headers = mapOf("content-type" to "application/vnd.signal-messenger.mrm")
)
return NetworkResult.fromWebSocket { unauthWebSocket.request(request, sealedSenderAccess) }
fun sendGroupMessage(body: ByteArray, auth: MultiRecipientSendAuthorization, timestamp: Long, online: Boolean, urgent: Boolean): RequestResult<MultiRecipientMessageResponse, MultiRecipientSendFailure> {
return runBlocking {
unauthWebSocket.runCatchingWithUnauthChatConnection { chatConnection ->
UnauthMessagesService(chatConnection).sendMultiRecipientMessage(body, timestamp, auth, online, urgent)
}.getOrError()
}
}
/**
@@ -103,3 +98,7 @@ class MessageApi(
private fun Boolean.toQueryParam(): String = if (this) "true" else "false"
}
fun MultiRecipientMessageResponse.unsentTargets(): Set<ServiceId> {
return unregisteredIds.mapTo(HashSet(unregisteredIds.size)) { ServiceId.fromLibSignal(it) }
}

View File

@@ -0,0 +1,13 @@
/*
* Copyright 2026 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.signalservice.api.push.exceptions
import java.io.IOException
/**
* Wraps an exception that does not hold retry after data so that it *can* have retry after data.
*/
class RetryNetworkException(val retryAfterMs: Long, cause: Throwable) : IOException(cause)

View File

@@ -0,0 +1,12 @@
/*
* Copyright 2026 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.signalservice.api.push.exceptions
/**
* Wraps a [org.signal.libsignal.net.RequestResult.ApplicationError]'s cause in a named
* [RuntimeException] so it is more identifiable.
*/
class UnknownGroupSendException(cause: Throwable) : RuntimeException(cause)

View File

@@ -1,13 +1,15 @@
/**
* Copyright (C) 2014-2016 Open Whisper Systems
*
* Licensed according to the LICENSE file in this repository.
/*
* Copyright 2026 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.signalservice.internal.push;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.signal.libsignal.net.MismatchedDeviceException;
import java.util.ArrayList;
import java.util.List;
public class MismatchedDevices {
@@ -24,4 +26,21 @@ public class MismatchedDevices {
public List<Integer> getExtraDevices() {
return extraDevices;
}
public static MismatchedDevices fromLibSignal(MismatchedDeviceException.Entry entry) {
MismatchedDevices result = new MismatchedDevices();
result.missingDevices = toList(entry.getMissingDevices());
result.extraDevices = toList(entry.getExtraDevices());
return result;
}
private static List<Integer> toList(int[] array) {
List<Integer> list = new ArrayList<>(array.length);
for (int value : array) {
list.add(value);
}
return list;
}
}

View File

@@ -15,11 +15,11 @@ import org.signal.core.util.Hex;
import org.signal.libsignal.protocol.InvalidKeyException;
import org.signal.libsignal.protocol.logging.Log;
import org.signal.storageservice.storage.protos.groups.AvatarUploadAttributes;
import org.signal.storageservice.storage.protos.groups.ExternalGroupCredential;
import org.signal.storageservice.storage.protos.groups.Group;
import org.signal.storageservice.storage.protos.groups.GroupChange;
import org.signal.storageservice.storage.protos.groups.GroupChangeResponse;
import org.signal.storageservice.storage.protos.groups.GroupChanges;
import org.signal.storageservice.storage.protos.groups.ExternalGroupCredential;
import org.signal.storageservice.storage.protos.groups.GroupJoinInfo;
import org.signal.storageservice.storage.protos.groups.GroupResponse;
import org.signal.storageservice.storage.protos.groups.Member;
@@ -79,11 +79,8 @@ import org.whispersystems.signalservice.internal.configuration.SignalUrl;
import org.whispersystems.signalservice.internal.crypto.AttachmentDigest;
import org.whispersystems.signalservice.internal.push.exceptions.ForbiddenException;
import org.whispersystems.signalservice.internal.push.exceptions.GroupExistsException;
import org.whispersystems.signalservice.internal.push.exceptions.GroupMismatchedDevicesException;
import org.whispersystems.signalservice.internal.push.exceptions.GroupNotFoundException;
import org.whispersystems.signalservice.internal.push.exceptions.GroupPatchNotAcceptedException;
import org.whispersystems.signalservice.internal.push.exceptions.GroupStaleDevicesException;
import org.whispersystems.signalservice.internal.push.exceptions.InvalidUnidentifiedAccessHeaderException;
import org.whispersystems.signalservice.internal.push.exceptions.MismatchedDevicesException;
import org.whispersystems.signalservice.internal.push.exceptions.NotInGroupException;
import org.whispersystems.signalservice.internal.push.exceptions.StaleDevicesException;
@@ -135,12 +132,10 @@ import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import kotlin.Pair;
import okhttp3.Call;
import okhttp3.ConnectionPool;
import okhttp3.ConnectionSpec;
import okhttp3.Dns;
import okhttp3.Headers;
import okhttp3.HttpUrl;
import okhttp3.Interceptor;
import okhttp3.MediaType;
@@ -533,62 +528,6 @@ public class PushServiceSocket {
return JsonUtil.fromJson(responseText, RegisterAsSecondaryDeviceResponse.class);
}
public SendGroupMessageResponse sendGroupMessage(byte[] body, @Nonnull SealedSenderAccess sealedSenderAccess, long timestamp, boolean online, boolean urgent, boolean story)
throws NonSuccessfulResponseCodeException, PushNetworkException, MalformedResponseException
{
ServiceConnectionHolder connectionHolder = (ServiceConnectionHolder) getRandom(serviceClients, random);
String path = String.format(Locale.US, GROUP_MESSAGE_PATH, timestamp, online, urgent, story);
Request.Builder requestBuilder = new Request.Builder();
requestBuilder.url(String.format("%s%s", connectionHolder.getUrl(), path));
requestBuilder.put(RequestBody.create(MediaType.get("application/vnd.signal-messenger.mrm"), body));
requestBuilder.addHeader(sealedSenderAccess.getHeaderName(), sealedSenderAccess.getHeaderValue());
if (signalAgent != null) {
requestBuilder.addHeader("X-Signal-Agent", signalAgent);
}
if (connectionHolder.getHostHeader().isPresent()) {
requestBuilder.addHeader("Host", connectionHolder.getHostHeader().get());
}
Call call = connectionHolder.getUnidentifiedClient().newCall(requestBuilder.build());
synchronized (connections) {
connections.add(call);
}
try (Response response = call.execute()) {
switch (response.code()) {
case 200:
return readBodyJson(response.body(), SendGroupMessageResponse.class);
case 401:
throw new InvalidUnidentifiedAccessHeaderException();
case 404:
throw new NotFoundException("At least one unregistered user in message send.");
case 409:
GroupMismatchedDevices[] mismatchedDevices = readBodyJson(response.body(), GroupMismatchedDevices[].class);
throw new GroupMismatchedDevicesException(mismatchedDevices);
case 410:
GroupStaleDevices[] staleDevices = readBodyJson(response.body(), GroupStaleDevices[].class);
throw new GroupStaleDevicesException(staleDevices);
case 508:
throw new ServerRejectedException();
default:
throw new NonSuccessfulResponseCodeException(response.code());
}
} catch (PushNetworkException | NonSuccessfulResponseCodeException | MalformedResponseException e) {
throw e;
} catch (IOException e) {
throw new PushNetworkException(e);
} finally {
synchronized (connections) {
connections.remove(call);
}
}
}
public SendMessageResponse sendMessage(OutgoingPushMessageList bundle, @Nullable SealedSenderAccess sealedSenderAccess, boolean story)
throws IOException
{

View File

@@ -1,36 +0,0 @@
package org.whispersystems.signalservice.internal.push;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.signal.libsignal.protocol.logging.Log;
import org.signal.core.models.ServiceId;
import java.util.HashSet;
import java.util.Set;
public class SendGroupMessageResponse {
private static final String TAG = SendGroupMessageResponse.class.getSimpleName();
// Contains serialized ServiceIds
@JsonProperty
private String[] uuids404;
public SendGroupMessageResponse() {}
public Set<ServiceId> getUnsentTargets() {
String[] uuids = uuids404 != null ? uuids404 : new String[0];
Set<ServiceId> serviceIds = new HashSet<>(uuids.length);
for (String raw : uuids) {
ServiceId parsed = ServiceId.parseOrNull(raw);
if (parsed != null) {
serviceIds.add(parsed);
} else {
Log.w(TAG, "Failed to parse ServiceId!");
}
}
return serviceIds;
}
}

View File

@@ -1,13 +1,15 @@
/**
* Copyright (C) 2014-2016 Open Whisper Systems
*
* Licensed according to the LICENSE file in this repository.
/*
* Copyright 2026 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.signalservice.internal.push;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.signal.libsignal.net.MismatchedDeviceException;
import java.util.ArrayList;
import java.util.List;
public class StaleDevices {
@@ -18,4 +20,15 @@ public class StaleDevices {
public List<Integer> getStaleDevices() {
return staleDevices;
}
public static StaleDevices fromLibSignal(MismatchedDeviceException.Entry entry) {
StaleDevices result = new StaleDevices();
int[] stale = entry.getStaleDevices();
List<Integer> list = new ArrayList<>(stale.length);
for (int value : stale) {
list.add(value);
}
result.staleDevices = list;
return result;
}
}