Add a binary format for incoming messages

The existing, general incoming message endpoint accepts messages as
JSON strings containing base64 data, along with all the metadata as
other JSON keys. That's not very efficient, and we don't make use of
that full generality anyway. This commit introduces a new binary
format that supports everything we're using from the old format (with
the help of some query parameters like multi-recipient messages).
This commit is contained in:
Jordan Rose
2022-02-07 16:05:03 -08:00
committed by GitHub
parent 51bac394ec
commit 41bf2b2c42
9 changed files with 790 additions and 188 deletions

View File

@@ -139,6 +139,7 @@ import org.whispersystems.textsecuregcm.metrics.NetworkSentGauge;
import org.whispersystems.textsecuregcm.metrics.OperatingSystemMemoryGauge;
import org.whispersystems.textsecuregcm.metrics.PushLatencyManager;
import org.whispersystems.textsecuregcm.metrics.TrafficSource;
import org.whispersystems.textsecuregcm.providers.MultiDeviceMessageListProvider;
import org.whispersystems.textsecuregcm.providers.MultiRecipientMessageProvider;
import org.whispersystems.textsecuregcm.providers.RedisClientFactory;
import org.whispersystems.textsecuregcm.providers.RedisClusterHealthCheck;
@@ -585,6 +586,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
environment.jersey().register(new ContentLengthFilter(TrafficSource.HTTP));
environment.jersey().register(acceptNumericOnlineFlagRequestFilter);
environment.jersey().register(MultiDeviceMessageListProvider.class);
environment.jersey().register(MultiRecipientMessageProvider.class);
environment.jersey().register(new MetricsApplicationEventListener(TrafficSource.HTTP));
environment.jersey()
@@ -607,6 +609,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
webSocketEnvironment.jersey().register(new WebsocketRefreshApplicationEventListener(accountsManager, clientPresenceManager));
webSocketEnvironment.jersey().register(new ContentLengthFilter(TrafficSource.WEBSOCKET));
webSocketEnvironment.jersey().register(acceptNumericOnlineFlagRequestFilter);
webSocketEnvironment.jersey().register(MultiDeviceMessageListProvider.class);
webSocketEnvironment.jersey().register(MultiRecipientMessageProvider.class);
webSocketEnvironment.jersey().register(new MetricsApplicationEventListener(TrafficSource.WEBSOCKET));
webSocketEnvironment.jersey().register(new KeepAliveController(clientPresenceManager));

View File

@@ -62,6 +62,7 @@ import org.whispersystems.textsecuregcm.auth.CombinedUnidentifiedSenderAccessKey
import org.whispersystems.textsecuregcm.auth.OptionalAccess;
import org.whispersystems.textsecuregcm.entities.AccountMismatchedDevices;
import org.whispersystems.textsecuregcm.entities.AccountStaleDevices;
import org.whispersystems.textsecuregcm.entities.IncomingDeviceMessage;
import org.whispersystems.textsecuregcm.entities.IncomingMessage;
import org.whispersystems.textsecuregcm.entities.IncomingMessageList;
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
@@ -77,6 +78,7 @@ import org.whispersystems.textsecuregcm.entities.StaleDevices;
import org.whispersystems.textsecuregcm.limits.RateLimitChallengeException;
import org.whispersystems.textsecuregcm.limits.RateLimiters;
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
import org.whispersystems.textsecuregcm.providers.MultiDeviceMessageListProvider;
import org.whispersystems.textsecuregcm.providers.MultiRecipientMessageProvider;
import org.whispersystems.textsecuregcm.push.ApnFallbackManager;
import org.whispersystems.textsecuregcm.push.MessageSender;
@@ -218,23 +220,15 @@ public class MessageController {
OptionalAccess.verify(source.map(AuthenticatedAccount::getAccount), accessKey, destination);
assert (destination.isPresent());
if (source.isPresent() && !source.get().getAccount().isIdentifiedBy(destinationUuid)) {
final String senderCountryCode = Util.getCountryCode(source.get().getAccount().getNumber());
try {
rateLimiters.getMessagesLimiter().validate(source.get().getAccount().getUuid(), destination.get().getUuid());
} catch (final RateLimitExceededException e) {
Metrics.counter(RATE_LIMITED_MESSAGE_COUNTER_NAME,
SENDER_COUNTRY_TAG_NAME, senderCountryCode,
RATE_LIMIT_REASON_TAG_NAME, "singleDestinationRate").increment();
throw e;
}
if (source.isPresent() && !isSyncMessage) {
checkRateLimit(source.get(), destination.get());
}
validateCompleteDeviceList(destination.get(), messages.getMessages(), isSyncMessage,
validateCompleteDeviceList(destination.get(), messages.getMessages(),
IncomingMessage::getDestinationDeviceId, isSyncMessage,
source.map(AuthenticatedAccount::getAuthenticatedDevice).map(Device::getId));
validateRegistrationIds(destination.get(), messages.getMessages());
validateRegistrationIds(destination.get(), messages.getMessages(),
IncomingMessage::getDestinationDeviceId, IncomingMessage::getDestinationRegistrationId);
final List<Tag> tags = List.of(UserAgentTagUtil.getPlatformTag(userAgent),
Tag.of(EPHEMERAL_TAG_NAME, String.valueOf(messages.isOnline())),
@@ -245,8 +239,108 @@ public class MessageController {
if (destinationDevice.isPresent()) {
Metrics.counter(SENT_MESSAGE_COUNTER_NAME, tags).increment();
sendMessage(source, destination.get(), destinationDevice.get(), destinationUuid, messages.getTimestamp(),
messages.isOnline(), incomingMessage, userAgent);
sendMessage(source, destination.get(), destinationDevice.get(), destinationUuid, messages.getTimestamp(), messages.isOnline(), incomingMessage, userAgent);
}
}
return Response.ok(new SendMessageResponse(
!isSyncMessage && source.isPresent() && source.get().getAccount().getEnabledDeviceCount() > 1)).build();
} catch (NoSuchUserException e) {
throw new WebApplicationException(Response.status(404).build());
} catch (MismatchedDevicesException e) {
throw new WebApplicationException(Response.status(409)
.type(MediaType.APPLICATION_JSON_TYPE)
.entity(new MismatchedDevices(e.getMissingDevices(),
e.getExtraDevices()))
.build());
} catch (StaleDevicesException e) {
throw new WebApplicationException(Response.status(410)
.type(MediaType.APPLICATION_JSON)
.entity(new StaleDevices(e.getStaleDevices()))
.build());
}
}
@Timed
@Path("/{destination}")
@PUT
@Consumes(MultiDeviceMessageListProvider.MEDIA_TYPE)
@Produces(MediaType.APPLICATION_JSON)
@FilterAbusiveMessages
public Response sendMultiDeviceMessage(@Auth Optional<AuthenticatedAccount> source,
@HeaderParam(OptionalAccess.UNIDENTIFIED) Optional<Anonymous> accessKey,
@HeaderParam("User-Agent") String userAgent,
@HeaderParam("X-Forwarded-For") String forwardedFor,
@PathParam("destination") UUID destinationUuid,
@QueryParam("online") boolean online,
@QueryParam("ts") long timestamp,
@Valid IncomingDeviceMessage[] messages)
throws RateLimitExceededException, RateLimitChallengeException {
if (source.isEmpty() && accessKey.isEmpty()) {
throw new WebApplicationException(Response.Status.UNAUTHORIZED);
}
final String senderType;
if (source.isPresent()) {
if (source.get().getAccount().isIdentifiedBy(destinationUuid)) {
senderType = SENDER_TYPE_SELF;
} else {
senderType = SENDER_TYPE_IDENTIFIED;
}
} else {
senderType = SENDER_TYPE_UNIDENTIFIED;
}
for (final IncomingDeviceMessage message : messages) {
int contentLength = message.getContent().length;
Metrics.summary(CONTENT_SIZE_DISTRIBUTION_NAME, Tags.of(UserAgentTagUtil.getPlatformTag(userAgent))).record(contentLength);
if (contentLength > MAX_MESSAGE_SIZE) {
Metrics.counter(REJECT_OVERSIZE_MESSAGE_COUNTER, Tags.of(UserAgentTagUtil.getPlatformTag(userAgent))).increment();
return Response.status(Response.Status.REQUEST_ENTITY_TOO_LARGE).build();
}
}
try {
boolean isSyncMessage = source.isPresent() && source.get().getAccount().isIdentifiedBy(destinationUuid);
Optional<Account> destination;
if (!isSyncMessage) {
destination = accountsManager.getByAccountIdentifier(destinationUuid)
.or(() -> accountsManager.getByPhoneNumberIdentifier(destinationUuid));
} else {
destination = source.map(AuthenticatedAccount::getAccount);
}
OptionalAccess.verify(source.map(AuthenticatedAccount::getAccount), accessKey, destination);
assert (destination.isPresent());
if (source.isPresent() && !isSyncMessage) {
checkRateLimit(source.get(), destination.get());
}
final List<IncomingDeviceMessage> messagesAsList = Arrays.asList(messages);
validateCompleteDeviceList(destination.get(), messagesAsList,
IncomingDeviceMessage::getDeviceId, isSyncMessage,
source.map(AuthenticatedAccount::getAuthenticatedDevice).map(Device::getId));
validateRegistrationIds(destination.get(), messagesAsList,
IncomingDeviceMessage::getDeviceId,
IncomingDeviceMessage::getRegistrationId);
final List<Tag> tags = List.of(UserAgentTagUtil.getPlatformTag(userAgent),
Tag.of(EPHEMERAL_TAG_NAME, String.valueOf(online)),
Tag.of(SENDER_TYPE_TAG_NAME, senderType));
for (final IncomingDeviceMessage message : messages) {
Optional<Device> destinationDevice = destination.get().getDevice(message.getDeviceId());
if (destinationDevice.isPresent()) {
Metrics.counter(SENT_MESSAGE_COUNTER_NAME, tags).increment();
sendMessage(source, destination.get(), destinationDevice.get(), destinationUuid, timestamp, online, message, userAgent);
}
}
@@ -544,6 +638,33 @@ public class MessageController {
}
}
private void sendMessage(Optional<AuthenticatedAccount> source, Account destinationAccount, Device destinationDevice, UUID destinationUuid, long timestamp, boolean online, IncomingDeviceMessage message, String userAgentString) throws NoSuchUserException {
try {
Envelope.Builder messageBuilder = Envelope.newBuilder();
long serverTimestamp = System.currentTimeMillis();
messageBuilder
.setType(Envelope.Type.forNumber(message.getType()))
.setTimestamp(timestamp == 0 ? serverTimestamp : timestamp)
.setServerTimestamp(serverTimestamp)
.setDestinationUuid(destinationUuid.toString())
.setContent(ByteString.copyFrom(message.getContent()));
source.ifPresent(authenticatedAccount ->
messageBuilder.setSource(authenticatedAccount.getAccount().getNumber())
.setSourceUuid(authenticatedAccount.getAccount().getUuid().toString())
.setSourceDevice((int) authenticatedAccount.getAuthenticatedDevice().getId()));
messageSender.sendMessage(destinationAccount, destinationDevice, messageBuilder.build(), online);
} catch (NotPushRegisteredException e) {
if (destinationDevice.isMaster()) {
throw new NoSuchUserException(e);
} else {
logger.debug("Not registered", e);
}
}
}
private void sendMessage(Account destinationAccount,
Device destinationDevice,
long timestamp,
@@ -577,12 +698,26 @@ public class MessageController {
}
}
private void checkRateLimit(AuthenticatedAccount source, Account destination) throws RateLimitExceededException {
final String senderCountryCode = Util.getCountryCode(source.getAccount().getNumber());
try {
rateLimiters.getMessagesLimiter().validate(source.getAccount().getUuid(), destination.getUuid());
} catch (final RateLimitExceededException e) {
Metrics.counter(RATE_LIMITED_MESSAGE_COUNTER_NAME,
SENDER_COUNTRY_TAG_NAME, senderCountryCode,
RATE_LIMIT_REASON_TAG_NAME, "singleDestinationRate").increment();
throw e;
}
}
@VisibleForTesting
public static void validateRegistrationIds(Account account, List<IncomingMessage> messages)
public static <T> void validateRegistrationIds(Account account, List<T> messages, Function<T, Long> getDeviceId, Function<T, Integer> getRegistrationId)
throws StaleDevicesException {
final Stream<Pair<Long, Integer>> deviceIdAndRegistrationIdStream = messages
.stream()
.map(message -> new Pair<>(message.getDestinationDeviceId(), message.getDestinationRegistrationId()));
.map(message -> new Pair<>(getDeviceId.apply(message), getRegistrationId.apply(message)));
validateRegistrationIds(account, deviceIdAndRegistrationIdStream);
}
@@ -604,10 +739,10 @@ public class MessageController {
}
@VisibleForTesting
public static void validateCompleteDeviceList(Account account, List<IncomingMessage> messages, boolean isSyncMessage,
public static <T> void validateCompleteDeviceList(Account account, List<T> messages, Function<T, Long> getDeviceId, boolean isSyncMessage,
Optional<Long> authenticatedDeviceId)
throws MismatchedDevicesException {
Set<Long> messageDeviceIds = messages.stream().map(IncomingMessage::getDestinationDeviceId)
Set<Long> messageDeviceIds = messages.stream().map(getDeviceId)
.collect(Collectors.toSet());
validateCompleteDeviceList(account, messageDeviceIds, isSyncMessage, authenticatedDeviceId);
}

View File

@@ -0,0 +1,47 @@
/*
* Copyright 2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.entities;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
public class IncomingDeviceMessage {
private final int type;
@Min(1)
private final long deviceId;
@Min(0)
@Max(65536)
private final int registrationId;
@NotNull
private final byte[] content;
public IncomingDeviceMessage(int type, long deviceId, int registrationId, byte[] content) {
this.type = type;
this.deviceId = deviceId;
this.registrationId = registrationId;
this.content = content;
}
public int getType() {
return type;
}
public long getDeviceId() {
return deviceId;
}
public int getRegistrationId() {
return registrationId;
}
public byte[] getContent() {
return content;
}
}

View File

@@ -0,0 +1,90 @@
/*
* Copyright 2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.providers;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InputStream;
import java.util.UUID;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.WebApplicationException;
public abstract class BinaryProviderBase {
/**
* Reads a UUID in network byte order and converts to a UUID object.
*/
UUID readUuid(InputStream stream) throws IOException {
byte[] buffer = new byte[8];
int read = stream.readNBytes(buffer, 0, 8);
if (read != 8) {
throw new IOException("Insufficient bytes for UUID");
}
long msb = convertNetworkByteOrderToLong(buffer);
read = stream.readNBytes(buffer, 0, 8);
if (read != 8) {
throw new IOException("Insufficient bytes for UUID");
}
long lsb = convertNetworkByteOrderToLong(buffer);
return new UUID(msb, lsb);
}
private long convertNetworkByteOrderToLong(byte[] buffer) {
long result = 0;
for (int i = 0; i < 8; i++) {
result = (result << 8) | (buffer[i] & 0xFFL);
}
return result;
}
/**
* Reads a varint. A varint larger than 64 bits is rejected with a {@code WebApplicationException}. An
* {@code IOException} is thrown if the stream ends before we finish reading the varint.
*
* @return the varint value
*/
static long readVarint(InputStream stream) throws IOException, WebApplicationException {
boolean hasMore = true;
int currentOffset = 0;
long result = 0;
while (hasMore) {
if (currentOffset >= 64) {
throw new BadRequestException("varint is too large");
}
int b = stream.read();
if (b == -1) {
throw new IOException("Missing byte " + (currentOffset / 7) + " of varint");
}
if (currentOffset == 63 && (b & 0xFE) != 0) {
throw new BadRequestException("varint is too large");
}
hasMore = (b & 0x80) != 0;
result |= ((long)(b & 0x7F)) << currentOffset;
currentOffset += 7;
}
return result;
}
/**
* Reads two bytes with most significant byte first. Treats the value as unsigned so the range returned is
* {@code [0, 65535]}.
*/
@VisibleForTesting
static int readU16(InputStream stream) throws IOException {
int b1 = stream.read();
if (b1 == -1) {
throw new IOException("Missing byte 1 of U16");
}
int b2 = stream.read();
if (b2 == -1) {
throw new IOException("Missing byte 2 of U16");
}
return (b1 << 8) | b2;
}
}

View File

@@ -0,0 +1,80 @@
/*
* Copyright 2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.providers;
import io.dropwizard.util.DataSizeUnit;
import java.io.IOException;
import java.io.InputStream;
import java.lang.annotation.Annotation;
import java.lang.reflect.Type;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.Consumes;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.NoContentException;
import javax.ws.rs.ext.MessageBodyReader;
import javax.ws.rs.ext.Provider;
import org.whispersystems.textsecuregcm.entities.IncomingDeviceMessage;
@Provider
@Consumes(MultiDeviceMessageListProvider.MEDIA_TYPE)
public class MultiDeviceMessageListProvider extends BinaryProviderBase implements MessageBodyReader<IncomingDeviceMessage[]> {
public static final String MEDIA_TYPE = "application/vnd.signal-messenger.mdml";
public static final int MAX_MESSAGE_COUNT = 50;
public static final int MAX_MESSAGE_SIZE = Math.toIntExact(DataSizeUnit.KIBIBYTES.toBytes(256));
public static final byte VERSION = 0x01;
@Override
public boolean isReadable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) {
return MEDIA_TYPE.equals(mediaType.toString()) && IncomingDeviceMessage[].class.isAssignableFrom(type);
}
@Override
public IncomingDeviceMessage[]
readFrom(Class<IncomingDeviceMessage[]> resultType, Type genericType,
Annotation[] annotations, MediaType mediaType, MultivaluedMap<String, String> httpHeaders,
InputStream entityStream)
throws IOException, WebApplicationException {
int versionByte = entityStream.read();
if (versionByte == -1) {
throw new NoContentException("Empty body not allowed");
}
if (versionByte != VERSION) {
throw new BadRequestException("Unsupported version");
}
int count = entityStream.read();
if (count == -1) {
throw new IOException("Missing count");
}
if (count > MAX_MESSAGE_COUNT) {
throw new BadRequestException("Maximum recipient count exceeded");
}
IncomingDeviceMessage[] messages = new IncomingDeviceMessage[count];
for (int i = 0; i < count; i++) {
long deviceId = readVarint(entityStream);
int registrationId = readU16(entityStream);
int type = entityStream.read();
if (type == -1) {
throw new IOException("Unexpected end of stream reading message type");
}
long messageLength = readVarint(entityStream);
if (messageLength > MAX_MESSAGE_SIZE) {
throw new BadRequestException("Message body too large");
}
byte[] contents = entityStream.readNBytes(Math.toIntExact(messageLength));
if (contents.length != messageLength) {
throw new IOException("Unexpected end of stream in the middle of message contents");
}
messages[i] = new IncomingDeviceMessage(type, deviceId, registrationId, contents);
}
return messages;
}
}

View File

@@ -5,7 +5,6 @@
package org.whispersystems.textsecuregcm.providers;
import com.google.common.annotations.VisibleForTesting;
import io.dropwizard.util.DataSizeUnit;
import java.io.IOException;
import java.io.InputStream;
@@ -24,7 +23,7 @@ import org.whispersystems.textsecuregcm.entities.MultiRecipientMessage;
@Provider
@Consumes(MultiRecipientMessageProvider.MEDIA_TYPE)
public class MultiRecipientMessageProvider implements MessageBodyReader<MultiRecipientMessage> {
public class MultiRecipientMessageProvider extends BinaryProviderBase implements MessageBodyReader<MultiRecipientMessage> {
public static final String MEDIA_TYPE = "application/vnd.signal-messenger.mrm";
public static final int MAX_RECIPIENT_COUNT = 5000;
@@ -71,78 +70,4 @@ public class MultiRecipientMessageProvider implements MessageBodyReader<MultiRec
}
return new MultiRecipientMessage(recipients, commonPayload);
}
/**
* Reads a UUID in network byte order and converts to a UUID object.
*/
private UUID readUuid(InputStream stream) throws IOException {
byte[] buffer = new byte[8];
int read = stream.readNBytes(buffer, 0, 8);
if (read != 8) {
throw new IOException("Insufficient bytes for UUID");
}
long msb = convertNetworkByteOrderToLong(buffer);
read = stream.readNBytes(buffer, 0, 8);
if (read != 8) {
throw new IOException("Insufficient bytes for UUID");
}
long lsb = convertNetworkByteOrderToLong(buffer);
return new UUID(msb, lsb);
}
private long convertNetworkByteOrderToLong(byte[] buffer) {
long result = 0;
for (int i = 0; i < 8; i++) {
result = (result << 8) | (buffer[i] & 0xFFL);
}
return result;
}
/**
* Reads a varint. A varint larger than 64 bits is rejected with a {@code WebApplicationException}. An
* {@code IOException} is thrown if the stream ends before we finish reading the varint.
*
* @return the varint value
*/
private long readVarint(InputStream stream) throws IOException, WebApplicationException {
boolean hasMore = true;
int currentOffset = 0;
int result = 0;
while (hasMore) {
if (currentOffset >= 64) {
throw new BadRequestException("varint is too large");
}
int b = stream.read();
if (b == -1) {
throw new IOException("Missing byte " + (currentOffset / 7) + " of varint");
}
if (currentOffset == 63 && (b & 0xFE) != 0) {
throw new BadRequestException("varint is too large");
}
hasMore = (b & 0x80) != 0;
result |= (b & 0x7F) << currentOffset;
currentOffset += 7;
}
return result;
}
/**
* Reads two bytes with most significant byte first. Treats the value as unsigned so the range returned is
* {@code [0, 65535]}.
*/
@VisibleForTesting
static int readU16(InputStream stream) throws IOException {
int b1 = stream.read();
if (b1 == -1) {
throw new IOException("Missing byte 1 of U16");
}
int b2 = stream.read();
if (b2 == -1) {
throw new IOException("Missing byte 2 of U16");
}
return (b1 << 8) | b2;
}
}