mirror of
https://github.com/signalapp/Signal-Server
synced 2026-04-19 17:28:04 +01:00
Explicitly pass sync message sender device ID as an argument to sendMessage
This commit is contained in:
committed by
Jon Chambers
parent
d6bc2765b6
commit
aa5fd52302
@@ -436,11 +436,16 @@ public class MessageController {
|
||||
final Map<Byte, Integer> registrationIdsByDeviceId = messages.messages().stream()
|
||||
.collect(Collectors.toMap(IncomingMessage::destinationDeviceId, IncomingMessage::destinationRegistrationId));
|
||||
|
||||
final Optional<Byte> syncMessageSenderDeviceId = messageType == MessageType.SYNC
|
||||
? Optional.ofNullable(sender).map(authenticatedDevice -> authenticatedDevice.getAuthenticatedDevice().getId())
|
||||
: Optional.empty();
|
||||
|
||||
try {
|
||||
messageSender.sendMessages(destination,
|
||||
destinationIdentifier,
|
||||
messagesByDeviceId,
|
||||
registrationIdsByDeviceId,
|
||||
syncMessageSenderDeviceId,
|
||||
userAgent);
|
||||
} catch (final MismatchedDevicesException e) {
|
||||
if (!e.getMismatchedDevices().staleDeviceIds().isEmpty()) {
|
||||
|
||||
@@ -187,7 +187,8 @@ public class MessagesAnonymousGrpcService extends SimpleMessagesAnonymousGrpc.Me
|
||||
destination,
|
||||
destinationServiceIdentifier,
|
||||
messagesByDeviceId,
|
||||
registrationIdsByDeviceId);
|
||||
registrationIdsByDeviceId,
|
||||
Optional.empty());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -8,6 +8,7 @@ package org.whispersystems.textsecuregcm.grpc;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusException;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import org.signal.chat.messages.MismatchedDevices;
|
||||
import org.signal.chat.messages.SendMessageResponse;
|
||||
import org.whispersystems.textsecuregcm.controllers.MismatchedDevicesException;
|
||||
@@ -31,6 +32,8 @@ public class MessagesGrpcHelper {
|
||||
* @param destinationServiceIdentifier the service identifier for the destination account
|
||||
* @param messagesByDeviceId a map of device IDs to message payloads
|
||||
* @param registrationIdsByDeviceId a map of device IDs to device registration IDs
|
||||
* @param syncMessageSenderDeviceId if the message is a sync message (i.e. a message to other devices linked to the
|
||||
* caller's own account), contains the ID of the device that sent the message
|
||||
*
|
||||
* @return a response object to send to callers
|
||||
*
|
||||
@@ -42,13 +45,16 @@ public class MessagesGrpcHelper {
|
||||
final Account destination,
|
||||
final ServiceIdentifier destinationServiceIdentifier,
|
||||
final Map<Byte, MessageProtos.Envelope> messagesByDeviceId,
|
||||
final Map<Byte, Integer> registrationIdsByDeviceId) throws StatusException, RateLimitExceededException {
|
||||
final Map<Byte, Integer> registrationIdsByDeviceId,
|
||||
@SuppressWarnings("OptionalUsedAsFieldOrParameterType") final Optional<Byte> syncMessageSenderDeviceId)
|
||||
throws StatusException, RateLimitExceededException {
|
||||
|
||||
try {
|
||||
messageSender.sendMessages(destination,
|
||||
destinationServiceIdentifier,
|
||||
messagesByDeviceId,
|
||||
registrationIdsByDeviceId,
|
||||
syncMessageSenderDeviceId,
|
||||
RequestAttributesUtil.getRawUserAgent().orElse(null));
|
||||
|
||||
return SEND_MESSAGE_SUCCESS_RESPONSE;
|
||||
|
||||
@@ -172,7 +172,8 @@ public class MessagesGrpcService extends SimpleMessagesGrpc.MessagesImplBase {
|
||||
destination,
|
||||
destinationServiceIdentifier,
|
||||
messagesByDeviceId,
|
||||
registrationIdsByDeviceId);
|
||||
registrationIdsByDeviceId,
|
||||
messageType == MessageType.SYNC ? Optional.of(sender.deviceId()) : Optional.empty());
|
||||
}
|
||||
|
||||
private static MessageProtos.Envelope.Type getEnvelopeType(final AuthenticatedSenderMessageType type) {
|
||||
|
||||
@@ -13,7 +13,6 @@ import io.micrometer.core.instrument.DistributionSummary;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import io.micrometer.core.instrument.Tag;
|
||||
import io.micrometer.core.instrument.Tags;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
@@ -21,6 +20,7 @@ import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.annotation.Nullable;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.signal.libsignal.protocol.SealedSenderMultiRecipientMessage;
|
||||
import org.signal.libsignal.protocol.util.Pair;
|
||||
@@ -36,7 +36,6 @@ import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
||||
import org.whispersystems.textsecuregcm.util.Util;
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
/**
|
||||
* A MessageSender sends Signal messages to destination devices. Messages may be "normal" user-to-user messages,
|
||||
@@ -86,6 +85,8 @@ public class MessageSender {
|
||||
* @param destinationIdentifier the service identifier to which the messages are addressed
|
||||
* @param messagesByDeviceId a map of device IDs to message payloads
|
||||
* @param registrationIdsByDeviceId a map of device IDs to device registration IDs
|
||||
* @param syncMessageSenderDeviceId if the message is a sync message (i.e. a message to other devices linked to the
|
||||
* caller's own account), contains the ID of the device that sent the message
|
||||
* @param userAgent the User-Agent string for the sender; may be {@code null} if not known
|
||||
*
|
||||
* @throws MismatchedDevicesException if the given bundle of messages did not include a message for all required
|
||||
@@ -97,38 +98,55 @@ public class MessageSender {
|
||||
final ServiceIdentifier destinationIdentifier,
|
||||
final Map<Byte, Envelope> messagesByDeviceId,
|
||||
final Map<Byte, Integer> registrationIdsByDeviceId,
|
||||
@SuppressWarnings("OptionalUsedAsFieldOrParameterType") final Optional<Byte> syncMessageSenderDeviceId,
|
||||
@Nullable final String userAgent) throws MismatchedDevicesException, MessageTooLargeException {
|
||||
|
||||
if (messagesByDeviceId.isEmpty()) {
|
||||
// TODO Simply return and don't throw an exception when iOS clients no longer depend on this behavior
|
||||
throw new MismatchedDevicesException(new MismatchedDevices(
|
||||
destination.getDevices().stream().map(Device::getId).collect(Collectors.toSet()),
|
||||
Collections.emptySet(),
|
||||
Collections.emptySet()));
|
||||
}
|
||||
|
||||
if (!destination.isIdentifiedBy(destinationIdentifier)) {
|
||||
throw new IllegalArgumentException("Destination account not identified by destination service identifier");
|
||||
}
|
||||
|
||||
final Envelope firstMessage = messagesByDeviceId.values().iterator().next();
|
||||
final boolean isSyncMessage;
|
||||
final boolean isStory;
|
||||
final byte excludedDeviceId;
|
||||
|
||||
final boolean isSyncMessage = StringUtils.isNotBlank(firstMessage.getSourceServiceId()) &&
|
||||
destination.isIdentifiedBy(ServiceIdentifier.valueOf(firstMessage.getSourceServiceId()));
|
||||
if (syncMessageSenderDeviceId.isPresent()) {
|
||||
if (messagesByDeviceId.values().stream().anyMatch(message -> StringUtils.isBlank(message.getSourceServiceId()) ||
|
||||
!destination.isIdentifiedBy(ServiceIdentifier.valueOf(message.getSourceServiceId())))) {
|
||||
|
||||
final boolean isStory = firstMessage.getStory();
|
||||
throw new IllegalArgumentException("Sync message sender device ID specified, but one or more messages are not addressed to sender");
|
||||
}
|
||||
|
||||
validateIndividualMessageContentLength(messagesByDeviceId.values(), isSyncMessage, isStory, userAgent);
|
||||
isSyncMessage = true;
|
||||
isStory = false;
|
||||
excludedDeviceId = syncMessageSenderDeviceId.get();
|
||||
} else {
|
||||
if (messagesByDeviceId.values().stream().anyMatch(message -> StringUtils.isNotBlank(message.getSourceServiceId()) &&
|
||||
destination.isIdentifiedBy(ServiceIdentifier.valueOf(message.getSourceServiceId())))) {
|
||||
|
||||
throw new IllegalArgumentException("Sync message sender device ID not specified, but one or more messages are addressed to sender");
|
||||
}
|
||||
|
||||
isSyncMessage = false;
|
||||
excludedDeviceId = NO_EXCLUDED_DEVICE_ID;
|
||||
|
||||
// It's technically possible that the caller tried to send a story with an empty message list, in which case we'd
|
||||
// incorrectly set this to `false`, but the mismatched device check will throw an exception before that matters.
|
||||
isStory = messagesByDeviceId.values().stream().findAny()
|
||||
.map(Envelope::getStory)
|
||||
.orElse(false);
|
||||
}
|
||||
|
||||
final Optional<MismatchedDevices> maybeMismatchedDevices = getMismatchedDevices(destination,
|
||||
destinationIdentifier,
|
||||
registrationIdsByDeviceId,
|
||||
isSyncMessage ? (byte) firstMessage.getSourceDevice() : NO_EXCLUDED_DEVICE_ID);
|
||||
excludedDeviceId);
|
||||
|
||||
if (maybeMismatchedDevices.isPresent()) {
|
||||
throw new MismatchedDevicesException(maybeMismatchedDevices.get());
|
||||
}
|
||||
|
||||
validateIndividualMessageContentLength(messagesByDeviceId.values(), isSyncMessage, isStory, userAgent);
|
||||
|
||||
messagesManager.insert(destination.getIdentifier(IdentityType.ACI), messagesByDeviceId)
|
||||
.forEach((deviceId, destinationPresent) -> {
|
||||
final Envelope message = messagesByDeviceId.get(deviceId);
|
||||
|
||||
@@ -8,6 +8,7 @@ package org.whispersystems.textsecuregcm.push;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.stream.Collectors;
|
||||
import org.slf4j.Logger;
|
||||
@@ -70,6 +71,7 @@ public class ReceiptSender {
|
||||
destinationIdentifier,
|
||||
messagesByDeviceId,
|
||||
registrationIdsByDeviceId,
|
||||
Optional.empty(),
|
||||
UserAgentTagUtil.SERVER_UA);
|
||||
} catch (final Exception e) {
|
||||
logger.warn("Could not send delivery receipt", e);
|
||||
|
||||
@@ -7,6 +7,7 @@ package org.whispersystems.textsecuregcm.storage;
|
||||
import com.google.protobuf.ByteString;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.annotation.Nullable;
|
||||
import org.apache.commons.lang3.ObjectUtils;
|
||||
@@ -116,7 +117,12 @@ public class ChangeNumberManager {
|
||||
final Map<Byte, Integer> registrationIdsByDeviceId = account.getDevices().stream()
|
||||
.collect(Collectors.toMap(Device::getId, Device::getRegistrationId));
|
||||
|
||||
messageSender.sendMessages(account, serviceIdentifier, messagesByDeviceId, registrationIdsByDeviceId, senderUserAgent);
|
||||
messageSender.sendMessages(account,
|
||||
serviceIdentifier,
|
||||
messagesByDeviceId,
|
||||
registrationIdsByDeviceId,
|
||||
Optional.of(Device.PRIMARY_ID),
|
||||
senderUserAgent);
|
||||
} catch (final RuntimeException e) {
|
||||
logger.warn("Changed number but could not send all device messages on {}", account.getUuid(), e);
|
||||
throw e;
|
||||
|
||||
Reference in New Issue
Block a user