Add routing for stories.

This commit is contained in:
erik-signal
2022-10-05 10:32:10 -04:00
committed by Erik Osheim
parent c2ab72c77e
commit 966c3a8f47
20 changed files with 425 additions and 86 deletions

View File

@@ -22,6 +22,7 @@ import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -33,7 +34,9 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import javax.ws.rs.BadRequestException;
@@ -92,6 +95,7 @@ import org.whispersystems.textsecuregcm.util.Util;
import org.whispersystems.textsecuregcm.util.ua.UnrecognizedUserAgentException;
import org.whispersystems.textsecuregcm.util.ua.UserAgentUtil;
import org.whispersystems.textsecuregcm.websocket.WebSocketConnection;
import org.whispersystems.websocket.Stories;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
@Path("/v1/messages")
@@ -164,7 +168,9 @@ public class MessageController {
@NotNull @Valid IncomingMessageList messages)
throws RateLimitExceededException {
if (source.isEmpty() && accessKey.isEmpty()) {
boolean isStory = messages.story();
if (source.isEmpty() && accessKey.isEmpty() && !isStory) {
throw new WebApplicationException(Response.Status.UNAUTHORIZED);
}
@@ -204,11 +210,18 @@ public class MessageController {
destination = source.map(AuthenticatedAccount::getAccount);
}
OptionalAccess.verify(source.map(AuthenticatedAccount::getAccount), accessKey, destination);
// Stories will be checked by the client; we bypass access checks here for stories.
if (!isStory) {
OptionalAccess.verify(source.map(AuthenticatedAccount::getAccount), accessKey, destination);
}
assert (destination.isPresent());
if (source.isPresent() && !isSyncMessage) {
checkRateLimit(source.get(), destination.get(), userAgent);
checkMessageRateLimit(source.get(), destination.get(), userAgent);
}
if (isStory) {
checkStoryRateLimit(destination.get());
}
final Set<Long> excludedDeviceIds;
@@ -238,12 +251,12 @@ public class MessageController {
if (destinationDevice.isPresent()) {
Metrics.counter(SENT_MESSAGE_COUNTER_NAME, tags).increment();
sendMessage(source, destination.get(), destinationDevice.get(), destinationUuid, messages.timestamp(), messages.online(), messages.urgent(), incomingMessage, userAgent);
sendIndividualMessage(source, destination.get(), destinationDevice.get(), destinationUuid, messages.timestamp(), messages.online(), isStory, messages.urgent(), incomingMessage, userAgent);
}
}
return Response.ok(new SendMessageResponse(
!isSyncMessage && source.isPresent() && source.get().getAccount().getEnabledDeviceCount() > 1)).build();
boolean needsSync = !isSyncMessage && source.isPresent() && source.get().getAccount().getEnabledDeviceCount() > 1;
return Response.ok(new SendMessageResponse(needsSync)).build();
} catch (NoSuchUserException e) {
throw new WebApplicationException(Response.status(404).build());
} catch (MismatchedDevicesException e) {
@@ -260,6 +273,35 @@ public class MessageController {
}
}
/**
* Build mapping of accounts to devices/registration IDs.
* <p>
* Messages that are stories will only be sent to the subset of recipients who have indicated they want to receive
* stories.
*
* @param multiRecipientMessage
* @param uuidToAccountMap
* @return
*/
private Map<Account, Set<Pair<Long, Integer>>> buildDeviceIdAndRegistrationIdMap(
MultiRecipientMessage multiRecipientMessage,
Map<UUID, Account> uuidToAccountMap
) {
Stream<Recipient> recipients = Arrays.stream(multiRecipientMessage.getRecipients());
return recipients.collect(Collectors.toMap(
recipient -> uuidToAccountMap.get(recipient.getUuid()),
recipient -> new HashSet<>(
Collections.singletonList(new Pair<>(recipient.getDeviceId(), recipient.getRegistrationId()))),
(a, b) -> {
a.addAll(b);
return a;
}
));
}
@Timed
@Path("/multi_recipient")
@PUT
@@ -267,43 +309,51 @@ public class MessageController {
@Produces(MediaType.APPLICATION_JSON)
@FilterAbusiveMessages
public Response sendMultiRecipientMessage(
@HeaderParam(OptionalAccess.UNIDENTIFIED) CombinedUnidentifiedSenderAccessKeys accessKeys,
@HeaderParam(OptionalAccess.UNIDENTIFIED) @Nullable CombinedUnidentifiedSenderAccessKeys accessKeys,
@HeaderParam("User-Agent") String userAgent,
@HeaderParam("X-Forwarded-For") String forwardedFor,
@QueryParam("online") boolean online,
@QueryParam("ts") long timestamp,
@QueryParam("story") boolean isStory,
@NotNull @Valid MultiRecipientMessage multiRecipientMessage) {
Map<UUID, Account> uuidToAccountMap = Arrays.stream(multiRecipientMessage.getRecipients())
.map(Recipient::getUuid)
.distinct()
.collect(Collectors.toUnmodifiableMap(Function.identity(), uuid -> {
Optional<Account> account = accountsManager.getByAccountIdentifier(uuid);
if (account.isEmpty()) {
throw new WebApplicationException(Status.NOT_FOUND);
}
return account.get();
}));
checkAccessKeys(accessKeys, uuidToAccountMap);
.collect(Collectors.toUnmodifiableMap(
Function.identity(),
uuid -> accountsManager
.getByAccountIdentifier(uuid)
.orElseThrow(() -> new WebApplicationException(Status.NOT_FOUND))));
final Map<Account, HashSet<Pair<Long, Integer>>> accountToDeviceIdAndRegistrationIdMap =
Arrays
.stream(multiRecipientMessage.getRecipients())
.collect(Collectors.toMap(
recipient -> uuidToAccountMap.get(recipient.getUuid()),
recipient -> new HashSet<>(
Collections.singletonList(new Pair<>(recipient.getDeviceId(), recipient.getRegistrationId()))),
(a, b) -> {
a.addAll(b);
return a;
}
));
// Stories will be checked by the client; we bypass access checks here for stories.
if (!isStory) {
checkAccessKeys(accessKeys, uuidToAccountMap);
}
final Map<Account, Set<Pair<Long, Integer>>> accountToDeviceIdAndRegistrationIdMap =
buildDeviceIdAndRegistrationIdMap(multiRecipientMessage, uuidToAccountMap);
// We might filter out all the recipients of a story (if none have enabled stories).
// In this case there is no error so we should just return 200 now.
if (isStory && accountToDeviceIdAndRegistrationIdMap.isEmpty()) {
return Response.ok(new SendMultiRecipientMessageResponse(new LinkedList<>())).build();
}
Collection<AccountMismatchedDevices> accountMismatchedDevices = new ArrayList<>();
Collection<AccountStaleDevices> accountStaleDevices = new ArrayList<>();
uuidToAccountMap.values().forEach(account -> {
final Set<Long> deviceIds = accountToDeviceIdAndRegistrationIdMap.get(account).stream().map(Pair::first)
.collect(Collectors.toSet());
if (isStory) {
checkStoryRateLimit(account);
}
Set<Long> deviceIds = accountToDeviceIdAndRegistrationIdMap
.getOrDefault(account, Collections.emptySet())
.stream()
.map(Pair::first)
.collect(Collectors.toSet());
try {
DestinationDeviceValidator.validateCompleteDeviceList(account, deviceIds, Collections.emptySet());
@@ -351,8 +401,8 @@ public class MessageController {
Device destinationDevice = destinationAccount.getDevice(recipient.getDeviceId()).orElseThrow();
sentMessageCounter.increment();
try {
sendMessage(destinationAccount, destinationDevice, timestamp, online, recipient,
multiRecipientMessage.getCommonPayload());
sendCommonPayloadMessage(destinationAccount, destinationDevice, timestamp, online, isStory,
recipient, multiRecipientMessage.getCommonPayload());
} catch (NoSuchUserException e) {
uuids404.add(destinationAccount.getUuid());
}
@@ -367,6 +417,10 @@ public class MessageController {
}
private void checkAccessKeys(CombinedUnidentifiedSenderAccessKeys accessKeys, Map<UUID, Account> uuidToAccountMap) {
// We should not have null access keys when checking access; bail out early.
if (accessKeys == null) {
throw new WebApplicationException(Status.UNAUTHORIZED);
}
AtomicBoolean throwUnauthorized = new AtomicBoolean(false);
byte[] empty = new byte[16];
final Optional<byte[]> UNRESTRICTED_UNIDENTIFIED_ACCESS_KEY = Optional.of(new byte[16]);
@@ -405,8 +459,11 @@ public class MessageController {
@GET
@Produces(MediaType.APPLICATION_JSON)
public OutgoingMessageEntityList getPendingMessages(@Auth AuthenticatedAccount auth,
@HeaderParam(Stories.X_SIGNAL_RECEIVE_STORIES) String receiveStoriesHeader,
@HeaderParam("User-Agent") String userAgent) {
boolean shouldReceiveStories = Stories.parseReceiveStoriesHeader(receiveStoriesHeader);
pushNotificationManager.handleMessagesRetrieved(auth.getAccount(), auth.getAuthenticatedDevice(), userAgent);
final OutgoingMessageEntityList outgoingMessages;
@@ -416,7 +473,12 @@ public class MessageController {
auth.getAuthenticatedDevice().getId(),
false);
outgoingMessages = new OutgoingMessageEntityList(messagesAndHasMore.first().stream()
Stream<Envelope> envelopes = messagesAndHasMore.first().stream();
if (!shouldReceiveStories) {
envelopes = envelopes.filter(e -> !e.getStory());
}
outgoingMessages = new OutgoingMessageEntityList(envelopes
.map(OutgoingMessageEntity::fromEnvelope)
.peek(outgoingMessageEntity -> MessageMetrics.measureAccountOutgoingMessageUuidMismatches(auth.getAccount(),
outgoingMessageEntity))
@@ -514,12 +576,13 @@ public class MessageController {
.build();
}
private void sendMessage(Optional<AuthenticatedAccount> source,
private void sendIndividualMessage(Optional<AuthenticatedAccount> source,
Account destinationAccount,
Device destinationDevice,
UUID destinationUuid,
long timestamp,
boolean online,
boolean story,
boolean urgent,
IncomingMessage incomingMessage,
String userAgentString)
@@ -532,6 +595,7 @@ public class MessageController {
source.map(AuthenticatedAccount::getAccount).orElse(null),
source.map(authenticatedAccount -> authenticatedAccount.getAuthenticatedDevice().getId()).orElse(null),
timestamp == 0 ? System.currentTimeMillis() : timestamp,
story,
urgent);
} catch (final IllegalArgumentException e) {
logger.warn("Received bad envelope type {} from {}", incomingMessage.type(), userAgentString);
@@ -545,10 +609,11 @@ public class MessageController {
}
}
private void sendMessage(Account destinationAccount,
private void sendCommonPayloadMessage(Account destinationAccount,
Device destinationDevice,
long timestamp,
boolean online,
boolean story,
Recipient recipient,
byte[] commonPayload) throws NoSuchUserException {
try {
@@ -566,6 +631,7 @@ public class MessageController {
.setTimestamp(timestamp == 0 ? serverTimestamp : timestamp)
.setServerTimestamp(serverTimestamp)
.setContent(ByteString.copyFrom(payload))
.setStory(story)
.setDestinationUuid(destinationAccount.getUuid().toString());
messageSender.sendMessage(destinationAccount, destinationDevice, messageBuilder.build(), online);
@@ -578,7 +644,14 @@ public class MessageController {
}
}
private void checkRateLimit(AuthenticatedAccount source, Account destination, String userAgent)
private void checkStoryRateLimit(Account destination) {
try {
rateLimiters.getMessagesLimiter().validate(destination.getUuid());
} catch (final RateLimitExceededException e) {
}
}
private void checkMessageRateLimit(AuthenticatedAccount source, Account destination, String userAgent)
throws RateLimitExceededException {
final String senderCountryCode = Util.getCountryCode(source.getAccount().getNumber());