mirror of
https://github.com/signalapp/Signal-Server
synced 2026-04-19 23:48:01 +01:00
Implement story sending via gRPC
This commit is contained in:
committed by
Jon Chambers
parent
37c4a0451a
commit
caa81b4885
@@ -5,7 +5,6 @@
|
||||
|
||||
package org.whispersystems.textsecuregcm.grpc;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusException;
|
||||
import java.time.Clock;
|
||||
@@ -14,12 +13,15 @@ import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
import org.signal.chat.messages.IndividualRecipientMessageBundle;
|
||||
import org.signal.chat.messages.MismatchedDevices;
|
||||
import org.signal.chat.messages.MultiRecipientMismatchedDevices;
|
||||
import org.signal.chat.messages.SendMessageResponse;
|
||||
import org.signal.chat.messages.SendMultiRecipientMessageRequest;
|
||||
import org.signal.chat.messages.SendMultiRecipientMessageResponse;
|
||||
import org.signal.chat.messages.SendMultiRecipientStoryRequest;
|
||||
import org.signal.chat.messages.SendSealedSenderMessageRequest;
|
||||
import org.signal.chat.messages.SendStoryMessageRequest;
|
||||
import org.signal.chat.messages.SimpleMessagesAnonymousGrpc;
|
||||
import org.signal.libsignal.protocol.InvalidMessageException;
|
||||
import org.signal.libsignal.protocol.InvalidVersionException;
|
||||
@@ -29,6 +31,7 @@ import org.whispersystems.textsecuregcm.controllers.MismatchedDevicesException;
|
||||
import org.whispersystems.textsecuregcm.controllers.MultiRecipientMismatchedDevicesException;
|
||||
import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException;
|
||||
import org.whispersystems.textsecuregcm.entities.MessageProtos;
|
||||
import org.whispersystems.textsecuregcm.identity.IdentityType;
|
||||
import org.whispersystems.textsecuregcm.identity.ServiceIdentifier;
|
||||
import org.whispersystems.textsecuregcm.limits.CardinalityEstimator;
|
||||
import org.whispersystems.textsecuregcm.limits.RateLimiters;
|
||||
@@ -99,8 +102,50 @@ public class MessagesAnonymousGrpcService extends SimpleMessagesAnonymousGrpc.Me
|
||||
case AUTHORIZATION_NOT_SET -> throw Status.UNAUTHENTICATED.asException();
|
||||
}
|
||||
|
||||
return sendIndividualMessage(destination,
|
||||
destinationServiceIdentifier,
|
||||
request.getMessages(),
|
||||
request.getEphemeral(),
|
||||
request.getUrgent(),
|
||||
false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SendMessageResponse sendStory(final SendStoryMessageRequest request)
|
||||
throws StatusException, RateLimitExceededException {
|
||||
|
||||
final ServiceIdentifier destinationServiceIdentifier =
|
||||
ServiceIdentifierUtil.fromGrpcServiceIdentifier(request.getDestination());
|
||||
|
||||
final Optional<Account> maybeDestination = accountsManager.getByServiceIdentifier(destinationServiceIdentifier);
|
||||
|
||||
if (maybeDestination.isEmpty()) {
|
||||
// Don't reveal to unauthenticated callers whether a destination account actually exists
|
||||
return SEND_MESSAGE_SUCCESS_RESPONSE;
|
||||
}
|
||||
|
||||
final Account destination = maybeDestination.get();
|
||||
|
||||
rateLimiters.getStoriesLimiter().validate(destination.getIdentifier(IdentityType.ACI));
|
||||
|
||||
return sendIndividualMessage(destination,
|
||||
destinationServiceIdentifier,
|
||||
request.getMessages(),
|
||||
false,
|
||||
request.getUrgent(),
|
||||
true);
|
||||
}
|
||||
|
||||
private SendMessageResponse sendIndividualMessage(final Account destination,
|
||||
final ServiceIdentifier destinationServiceIdentifier,
|
||||
final IndividualRecipientMessageBundle messages,
|
||||
final boolean ephemeral,
|
||||
final boolean urgent,
|
||||
final boolean story) throws StatusException, RateLimitExceededException {
|
||||
|
||||
final SpamCheckResult<GrpcResponse<SendMessageResponse>> spamCheckResult =
|
||||
spamChecker.checkForIndividualRecipientSpamGrpc(MessageType.INDIVIDUAL_SEALED_SENDER,
|
||||
spamChecker.checkForIndividualRecipientSpamGrpc(
|
||||
story ? MessageType.INDIVIDUAL_STORY : MessageType.INDIVIDUAL_SEALED_SENDER,
|
||||
Optional.empty(),
|
||||
Optional.of(destination),
|
||||
destinationServiceIdentifier);
|
||||
@@ -110,7 +155,7 @@ public class MessagesAnonymousGrpcService extends SimpleMessagesAnonymousGrpc.Me
|
||||
}
|
||||
|
||||
try {
|
||||
final int totalPayloadLength = request.getMessages().getMessagesMap().values().stream()
|
||||
final int totalPayloadLength = messages.getMessagesMap().values().stream()
|
||||
.mapToInt(message -> message.getPayload().size())
|
||||
.sum();
|
||||
|
||||
@@ -120,28 +165,23 @@ public class MessagesAnonymousGrpcService extends SimpleMessagesAnonymousGrpc.Me
|
||||
throw e;
|
||||
}
|
||||
|
||||
final Map<Byte, MessageProtos.Envelope> messagesByDeviceId = request.getMessages().getMessagesMap().entrySet()
|
||||
final Map<Byte, MessageProtos.Envelope> messagesByDeviceId = messages.getMessagesMap().entrySet()
|
||||
.stream()
|
||||
.collect(Collectors.toMap(
|
||||
entry -> DeviceIdUtil.validate(entry.getKey()),
|
||||
entry -> {
|
||||
final MessageProtos.Envelope.Builder envelopeBuilder = MessageProtos.Envelope.newBuilder()
|
||||
.setType(MessageProtos.Envelope.Type.UNIDENTIFIED_SENDER)
|
||||
.setClientTimestamp(request.getMessages().getTimestamp())
|
||||
.setServerTimestamp(clock.millis())
|
||||
.setDestinationServiceId(destinationServiceIdentifier.toServiceIdentifierString())
|
||||
.setEphemeral(request.getEphemeral())
|
||||
.setUrgent(request.getUrgent())
|
||||
.setContent(entry.getValue().getPayload());
|
||||
|
||||
spamCheckResult.token()
|
||||
.ifPresent(token -> envelopeBuilder.setReportSpamToken(ByteString.copyFrom(token)));
|
||||
|
||||
return envelopeBuilder.build();
|
||||
}
|
||||
entry -> MessageProtos.Envelope.newBuilder()
|
||||
.setType(MessageProtos.Envelope.Type.UNIDENTIFIED_SENDER)
|
||||
.setClientTimestamp(messages.getTimestamp())
|
||||
.setServerTimestamp(clock.millis())
|
||||
.setDestinationServiceId(destinationServiceIdentifier.toServiceIdentifierString())
|
||||
.setEphemeral(ephemeral)
|
||||
.setUrgent(urgent)
|
||||
.setStory(story)
|
||||
.setContent(entry.getValue().getPayload())
|
||||
.build()
|
||||
));
|
||||
|
||||
final Map<Byte, Integer> registrationIdsByDeviceId = request.getMessages().getMessagesMap().entrySet().stream()
|
||||
final Map<Byte, Integer> registrationIdsByDeviceId = messages.getMessagesMap().entrySet().stream()
|
||||
.collect(Collectors.toMap(
|
||||
entry -> entry.getKey().byteValue(),
|
||||
entry -> entry.getValue().getRegistrationId()));
|
||||
@@ -167,36 +207,47 @@ public class MessagesAnonymousGrpcService extends SimpleMessagesAnonymousGrpc.Me
|
||||
public SendMultiRecipientMessageResponse sendMultiRecipientMessage(final SendMultiRecipientMessageRequest request)
|
||||
throws StatusException {
|
||||
|
||||
final SealedSenderMultiRecipientMessage multiRecipientMessage;
|
||||
|
||||
try {
|
||||
multiRecipientMessage = SealedSenderMultiRecipientMessage.parse(request.getMessage().getPayload().toByteArray());
|
||||
} catch (final InvalidMessageException | InvalidVersionException e) {
|
||||
throw Status.INVALID_ARGUMENT.withCause(e).asException();
|
||||
}
|
||||
|
||||
// Check that the request is well-formed and doesn't contain repeated entries for the same device for the same
|
||||
// recipient
|
||||
{
|
||||
final boolean[] usedDeviceIds = new boolean[Device.MAXIMUM_DEVICE_ID];
|
||||
|
||||
for (final SealedSenderMultiRecipientMessage.Recipient recipient : multiRecipientMessage.getRecipients().values()) {
|
||||
Arrays.fill(usedDeviceIds, false);
|
||||
|
||||
for (final byte deviceId : recipient.getDevices()) {
|
||||
if (usedDeviceIds[deviceId]) {
|
||||
throw Status.INVALID_ARGUMENT.withDescription("Request contains repeated device entries").asException();
|
||||
}
|
||||
|
||||
usedDeviceIds[deviceId] = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
final SealedSenderMultiRecipientMessage multiRecipientMessage =
|
||||
parseAndValidateMultiRecipientMessage(request.getMessage().getPayload().toByteArray());
|
||||
|
||||
groupSendTokenUtil.checkGroupSendToken(request.getGroupSendToken(), multiRecipientMessage.getRecipients().keySet());
|
||||
|
||||
return sendMultiRecipientMessage(multiRecipientMessage,
|
||||
request.getMessage().getTimestamp(),
|
||||
request.getEphemeral(),
|
||||
request.getUrgent(),
|
||||
false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SendMultiRecipientMessageResponse sendMultiRecipientStory(final SendMultiRecipientStoryRequest request)
|
||||
throws StatusException {
|
||||
|
||||
final SealedSenderMultiRecipientMessage multiRecipientMessage =
|
||||
parseAndValidateMultiRecipientMessage(request.getMessage().getPayload().toByteArray());
|
||||
|
||||
return sendMultiRecipientMessage(multiRecipientMessage,
|
||||
request.getMessage().getTimestamp(),
|
||||
false,
|
||||
request.getUrgent(),
|
||||
true)
|
||||
.toBuilder()
|
||||
// Don't identify unresolved recipients for stories
|
||||
.clearUnresolvedRecipients()
|
||||
.build();
|
||||
}
|
||||
|
||||
private SendMultiRecipientMessageResponse sendMultiRecipientMessage(
|
||||
final SealedSenderMultiRecipientMessage multiRecipientMessage,
|
||||
final long timestamp,
|
||||
final boolean ephemeral,
|
||||
final boolean urgent,
|
||||
final boolean story) throws StatusException {
|
||||
|
||||
final SpamCheckResult<GrpcResponse<SendMultiRecipientMessageResponse>> spamCheckResult =
|
||||
spamChecker.checkForMultiRecipientSpamGrpc(MessageType.MULTI_RECIPIENT_SEALED_SENDER);
|
||||
spamChecker.checkForMultiRecipientSpamGrpc(story
|
||||
? MessageType.MULTI_RECIPIENT_STORY
|
||||
: MessageType.MULTI_RECIPIENT_SEALED_SENDER);
|
||||
|
||||
if (spamCheckResult.response().isPresent()) {
|
||||
return spamCheckResult.response().get().getResponseOrThrowStatus();
|
||||
@@ -205,26 +256,15 @@ public class MessagesAnonymousGrpcService extends SimpleMessagesAnonymousGrpc.Me
|
||||
// At this point, the caller has at least superficially provided the information needed to send a multi-recipient
|
||||
// message. Attempt to resolve the destination service identifiers to Signal accounts.
|
||||
final Map<SealedSenderMultiRecipientMessage.Recipient, Account> resolvedRecipients =
|
||||
Flux.fromIterable(multiRecipientMessage.getRecipients().entrySet())
|
||||
.flatMap(serviceIdAndRecipient -> {
|
||||
final ServiceIdentifier serviceIdentifier =
|
||||
ServiceIdentifier.fromLibsignal(serviceIdAndRecipient.getKey());
|
||||
|
||||
return Mono.fromFuture(() -> accountsManager.getByServiceIdentifierAsync(serviceIdentifier))
|
||||
.flatMap(Mono::justOrEmpty)
|
||||
.map(account -> Tuples.of(serviceIdAndRecipient.getValue(), account));
|
||||
}, MAX_FETCH_ACCOUNT_CONCURRENCY)
|
||||
.collectMap(Tuple2::getT1, Tuple2::getT2)
|
||||
.blockOptional()
|
||||
.orElse(Collections.emptyMap());
|
||||
resolveRecipients(multiRecipientMessage);
|
||||
|
||||
try {
|
||||
messageSender.sendMultiRecipientMessage(multiRecipientMessage,
|
||||
resolvedRecipients,
|
||||
request.getMessage().getTimestamp(),
|
||||
false,
|
||||
request.getEphemeral(),
|
||||
request.getUrgent(),
|
||||
timestamp,
|
||||
story,
|
||||
ephemeral,
|
||||
urgent,
|
||||
RequestAttributesUtil.getRawUserAgent().orElse(null));
|
||||
|
||||
final SendMultiRecipientMessageResponse.Builder responseBuilder = SendMultiRecipientMessageResponse.newBuilder();
|
||||
@@ -255,6 +295,62 @@ public class MessagesAnonymousGrpcService extends SimpleMessagesAnonymousGrpc.Me
|
||||
}
|
||||
}
|
||||
|
||||
private SealedSenderMultiRecipientMessage parseAndValidateMultiRecipientMessage(
|
||||
final byte[] serializedMultiRecipientMessage) throws StatusException {
|
||||
|
||||
final SealedSenderMultiRecipientMessage multiRecipientMessage;
|
||||
|
||||
try {
|
||||
multiRecipientMessage = SealedSenderMultiRecipientMessage.parse(serializedMultiRecipientMessage);
|
||||
} catch (final InvalidMessageException | InvalidVersionException e) {
|
||||
throw Status.INVALID_ARGUMENT.withCause(e).asException();
|
||||
}
|
||||
|
||||
// Check that the request is well-formed and doesn't contain repeated entries for the same device for the same
|
||||
// recipient
|
||||
validateNoDuplicateDevices(multiRecipientMessage);
|
||||
|
||||
return multiRecipientMessage;
|
||||
}
|
||||
|
||||
private void validateNoDuplicateDevices(final SealedSenderMultiRecipientMessage multiRecipientMessage)
|
||||
throws StatusException {
|
||||
|
||||
final boolean[] usedDeviceIds = new boolean[Device.MAXIMUM_DEVICE_ID + 1];
|
||||
|
||||
for (final SealedSenderMultiRecipientMessage.Recipient recipient : multiRecipientMessage.getRecipients().values()) {
|
||||
if (recipient.getDevices().length == 1) {
|
||||
// A recipient can't have repeated devices if they only have one device
|
||||
continue;
|
||||
}
|
||||
|
||||
Arrays.fill(usedDeviceIds, false);
|
||||
|
||||
for (final byte deviceId : recipient.getDevices()) {
|
||||
if (usedDeviceIds[deviceId]) {
|
||||
throw Status.INVALID_ARGUMENT.withDescription("Request contains repeated device entries").asException();
|
||||
}
|
||||
|
||||
usedDeviceIds[deviceId] = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Map<SealedSenderMultiRecipientMessage.Recipient, Account> resolveRecipients(final SealedSenderMultiRecipientMessage multiRecipientMessage) {
|
||||
return Flux.fromIterable(multiRecipientMessage.getRecipients().entrySet())
|
||||
.flatMap(serviceIdAndRecipient -> {
|
||||
final ServiceIdentifier serviceIdentifier =
|
||||
ServiceIdentifier.fromLibsignal(serviceIdAndRecipient.getKey());
|
||||
|
||||
return Mono.fromFuture(() -> accountsManager.getByServiceIdentifierAsync(serviceIdentifier))
|
||||
.flatMap(Mono::justOrEmpty)
|
||||
.map(account -> Tuples.of(serviceIdAndRecipient.getValue(), account));
|
||||
}, MAX_FETCH_ACCOUNT_CONCURRENCY)
|
||||
.collectMap(Tuple2::getT1, Tuple2::getT2)
|
||||
.blockOptional()
|
||||
.orElse(Collections.emptyMap());
|
||||
}
|
||||
|
||||
private MismatchedDevices buildMismatchedDevices(final ServiceIdentifier serviceIdentifier,
|
||||
org.whispersystems.textsecuregcm.controllers.MismatchedDevices mismatchedDevices) {
|
||||
|
||||
|
||||
Reference in New Issue
Block a user